diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java index d521517..87c10c8 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java @@ -19,6 +19,8 @@ import java.util.List; public enum CacheNameEnums { FAULT("fault", "故障"), FENCE("fence", "围栏"), + STORAGE("storage", "持久化"), + REALTIME("realTime", "实时信息"), WARMING("warming", "预警"); private final String code; diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/BasicData.java similarity index 78% rename from cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java rename to cloud-data-processing/src/main/java/com/muyu/data/processing/domain/BasicData.java index 0f1b542..706b2cf 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/BasicData.java @@ -1,9 +1,6 @@ package com.muyu.data.processing.domain; -import com.muyu.common.core.enums.ClassType; import lombok.*; -import org.apache.commons.lang3.ObjectUtils; -import org.apache.commons.lang3.StringUtils; import java.io.Serializable; @@ -22,7 +19,7 @@ import java.io.Serializable; @Builder @NoArgsConstructor @AllArgsConstructor -public class KafkaData implements Serializable { +public class BasicData implements Serializable { private String key; private String label; diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/Temporary1.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/Temporary1.java deleted file mode 100644 index f84b44c..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/Temporary1.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.muyu.data.processing.domain; - -import lombok.*; - -/** - * 临时类1 - * - * @Author: 胡杨 - * @Name: Temporary1 - * @Description: 临时类1 - * @CreatedDate: 2024/9/30 下午7:26 - * @FilePath: com.muyu.data.processing.domain - */ - -@Data -@ToString -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class Temporary1 { - private String test; -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java index 9c44c6c..b4b2196 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java @@ -5,8 +5,9 @@ import cn.hutool.core.thread.ThreadUtil; import cn.hutool.json.JSONUtil; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.muyu.common.kafka.constants.KafkaConstants; -import com.muyu.data.processing.domain.KafkaData; +import com.muyu.data.processing.domain.BasicData; import com.muyu.data.processing.service.DataProcessingService; +import com.muyu.data.processing.strategy.core.StartStrategy; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -17,6 +18,7 @@ import org.springframework.stereotype.Component; import java.time.Duration; import java.util.Collection; +import java.util.HashMap; import java.util.List; /** @@ -34,7 +36,7 @@ public class KafkaConsumerService implements InitializingBean { @Resource private KafkaConsumer kafkaConsumer; @Resource - private DataProcessingService service; + private StartStrategy startStrategy; @Override public void afterPropertiesSet() throws Exception { @@ -51,12 +53,19 @@ public class KafkaConsumerService implements InitializingBean { String originalMsg = (String) consumerRecord.value(); log.info("从Kafka中消费的原始数据: " + originalMsg); //2.把消费数据转换为DTO对象 - List dataList = JSONUtil.toList(originalMsg, KafkaData.class); + List dataList = JSONUtil.toList(originalMsg, BasicData.class); log.info("从Kafka中消费的实体数据: " + dataList); - service.strategyCheck(dataList); + // 执行策略 + startStrategy.applyStrategy(getDataMap(dataList)); } } }); thread.start(); } + + private HashMap getDataMap(List dataList) { + HashMap basicDataHashMap = new HashMap<>(); + dataList.forEach(data -> basicDataHashMap.put(data.getKey(), data)); + return basicDataHashMap; + } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java index dfb4e79..c54390a 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java @@ -1,7 +1,7 @@ package com.muyu.data.processing.mapper; import com.muyu.data.processing.domain.IotDbData; -import com.muyu.data.processing.domain.KafkaData; +import com.muyu.data.processing.domain.BasicData; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; @@ -26,7 +26,7 @@ public interface DataProcessingMapper{ Integer insIotDbData(@Param("key") String key, @Param("value") String value); - void strategyCheck(@Param("dataList") List dataList); + void strategyCheck(@Param("dataList") List dataList); Integer insIotDbDataVo(IotDbData build); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java index 591251b..7876553 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java @@ -1,8 +1,7 @@ package com.muyu.data.processing.service; -import com.muyu.data.processing.domain.IotDbData; -import com.muyu.data.processing.domain.KafkaData; +import com.muyu.data.processing.domain.BasicData; import java.util.List; @@ -25,7 +24,7 @@ public interface DataProcessingService{ */ List selectStorageGroup(); - void strategyCheck(List dataList); + void strategyCheck(List dataList); Integer insIotDbData(String key, String value); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java index 076ce8c..f2c9bb0 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -4,7 +4,7 @@ package com.muyu.data.processing.service.impl; import javax.annotation.Resource; import com.muyu.data.processing.domain.IotDbData; -import com.muyu.data.processing.domain.KafkaData; +import com.muyu.data.processing.domain.BasicData; import com.muyu.data.processing.strategy.core.StartStrategy; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; @@ -39,8 +39,8 @@ public class DataProcessingServiceImpl implements DataProcessingService { } @Override - public void strategyCheck(List dataList) { - HashMap kafkaDataHashMap = new HashMap<>(); + public void strategyCheck(List dataList) { + HashMap kafkaDataHashMap = new HashMap<>(); dataList.forEach(data -> kafkaDataHashMap.put(data.getKey(), data)); // Result result = rootStrategy.applyStrategy(kafkaDataHashMap); // String[] data = result.getData(); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataProcessStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java similarity index 58% rename from cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataProcessStrategy.java rename to cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java index d6b3460..f6f45e0 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataProcessStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java @@ -1,6 +1,7 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -20,15 +21,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class DataProcessStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class DataStorageProcessStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new DataStorageStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("持久化数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java index 14f5c32..25bbacd 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -21,15 +23,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class FaultJudgmentStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class FaultJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new FaultAlarmStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("故障判断节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java index faf1abc..405f2ca 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -20,15 +22,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class FaultProcessingStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class FaultProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new FaultJudgmentStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("故障数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java index e8a7280..dd73ce0 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -21,15 +23,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class FenceJudgmentStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class FenceJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new FenceAlarmStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("围栏数据判断节点通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java index e7046a3..bb5d7f4 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -19,15 +21,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class FenceProcessingStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class FenceProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new FenceJudgmentStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("围栏数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java index 75291ef..f908c0e 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -21,15 +23,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class RealTimeJudgmentStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class RealTimeJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new RealTimeAlarmStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("实时数据判断节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java index cab608a..45ebf07 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -19,15 +21,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class RealTimeProcessingStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class RealTimeProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new RealTimeJudgmentStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("实时数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java index c26ee7e..58b6f0c 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -21,15 +23,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class WarningJudgmentStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class WarningJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new WarningAlarmStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("预警数据判断节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java index 4330cf7..f068e9e 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.branch; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -19,15 +21,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class WarningProcessingStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class WarningProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param-> new WarningJudgmentStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("预警数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java index dc98483..b9d7f2c 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java @@ -1,6 +1,9 @@ package com.muyu.data.processing.strategy.core; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -19,15 +22,20 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class BasicStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class BasicStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param -> new RoutingStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("开始执行基础校验节点。。。"); + basicDataMap.put(CacheNameEnums.WARMING.getCode(), BasicData.builder().key("test").build()); + basicDataMap.put(CacheNameEnums.FAULT.getCode(), BasicData.builder().key("test").build()); + basicDataMap.put(CacheNameEnums.REALTIME.getCode(), BasicData.builder().key("test").build()); + log.info("基础校验节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java index 73ff592..410631a 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.core; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import lombok.extern.slf4j.Slf4j; @@ -18,9 +20,10 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class EndStrategy implements StrategyHandler { +public class EndStrategy implements StrategyHandler, Temporary2> { @Override - public Temporary2 apply(Temporary1 temporary1) { + public Temporary2 apply(HashMap basicDataMap) { + log.info("结束节点已通过。。。"); return null; } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java index 5adca69..7b45f17 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java @@ -1,9 +1,12 @@ package com.muyu.data.processing.strategy.core; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap;import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.branch.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -19,21 +22,37 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class RoutingStrategy extends abstractStrategyRouter - implements StrategyHandler { - @Override - protected StrategyMapper registerStrategy() { - return new StrategyMapper() { - @Override - public StrategyHandler getHandler(Temporary1 param) { - // 编写路由规则 - return null; - } - }; +public class RoutingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + + private final static HashMap, Temporary2>> map = new HashMap<>(); + static{ + map.put(CacheNameEnums.WARMING.getCode(), new WarningProcessingStrategy()); + map.put(CacheNameEnums.REALTIME.getCode(), new RealTimeProcessingStrategy()); + map.put(CacheNameEnums.FENCE.getCode(), new FenceProcessingStrategy()); + map.put(CacheNameEnums.FAULT.getCode(), new FaultProcessingStrategy()); + map.put(CacheNameEnums.STORAGE.getCode(), new DataStorageProcessStrategy()); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> { + // 编写路由规则 + List codes = CacheNameEnums.getCodes(); + for (String code : codes) { + if(param.containsKey(code)){ + return map.get(code); + } + } + // 默认返回结束节点 + return new EndStrategy(); + }; + }; + + + @Override + public Temporary2 apply(HashMap stringListHashMap) { + log.info("路由节点已通过。。。"); + return applyStrategy(stringListHashMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java index f6ed921..c451af9 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java @@ -1,10 +1,14 @@ package com.muyu.data.processing.strategy.core; -import com.muyu.data.processing.domain.Temporary1; + + import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.abstractStrategyRouter; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; /** * 开始节点 @@ -34,9 +38,9 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class StartStrategy extends abstractStrategyRouter { +public class StartStrategy extends abstractStrategyRouter, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param -> new BasicStrategy(); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java index 5ead80e..17b8cbe 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.leaves; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -20,15 +22,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class DataStorageStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class DataStorageStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param -> new RoutingStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("数据持久化节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java index b6aaddc..55f0a39 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.leaves; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -20,15 +22,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class FaultAlarmStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class FaultAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param -> new RoutingStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("故障报警节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java index 563f304..d857c64 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.leaves; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -20,15 +22,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class FenceAlarmStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class FenceAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param -> new RoutingStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("围栏报警节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java index 928eeda..d322414 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.leaves; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -20,15 +22,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class RealTimeAlarmStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class RealTimeAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param -> new RoutingStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("实时数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java index dbfec6f..b604e4c 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java @@ -1,6 +1,8 @@ package com.muyu.data.processing.strategy.leaves; -import com.muyu.data.processing.domain.Temporary1; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; @@ -20,15 +22,16 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class WarningAlarmStrategy extends abstractStrategyRouter - implements StrategyHandler { +public class WarningAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { @Override - protected StrategyMapper registerStrategy() { + protected StrategyMapper, Temporary2> registerStrategy() { return param -> new RoutingStrategy(); } @Override - public Temporary2 apply(Temporary1 temporary1) { - return applyStrategy(temporary1); + public Temporary2 apply(HashMap basicDataMap) { + log.info("预警报警节点已通过。。。"); + return applyStrategy(basicDataMap); } }