diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java index d8b4eed..cf1f8ac 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java @@ -1,7 +1,9 @@ package com.muyu.data.processing.strategy; +import com.github.yulichang.toolkit.SpringContentUtils; import com.muyu.common.redis.service.RedisService; import com.muyu.data.processing.strategy.core.EndStrategy; +import org.springframework.data.redis.core.RedisTemplate; /** * 策略控制者接口 @@ -13,7 +15,6 @@ import com.muyu.data.processing.strategy.core.EndStrategy; */ public interface StrategyHandler { - RedisService redisService = new RedisService(); @SuppressWarnings("rawtypes") StrategyHandler DEFAULT = param -> new EndStrategy(); diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java index 8c3aa60..b0d1945 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java @@ -40,12 +40,10 @@ public class BasicStrategy extends abstractStrategyRouter basicDataMap) { log.info("开始执行基础校验节点。。。"); basicDataMap.put(CacheNameEnums.STORAGE.getCode(), null); - CacheNameEnums.getCodes().forEach(code-> { - // 如果缓存信息不为空,则说明车辆需要处理该事件 - if (ObjectUtils.isNotEmpty(cacheUtils.hasKey(code, basicDataMap.get("VIN").getKey()))){ - basicDataMap.put(code, null); - } - }); + CacheNameEnums.getCodes() + .stream() + .filter(code -> cacheUtils.hasKey(code, basicDataMap.get("VIN").getKey())) + .forEach(code-> basicDataMap.put(code, null)); return applyStrategy(basicDataMap); } } diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java index 0d9a4c5..059d0a6 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java @@ -8,6 +8,7 @@ import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; import com.muyu.data.processing.strategy.branch.*; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; import org.springframework.stereotype.Component; /** @@ -36,11 +37,12 @@ public class RoutingStrategy extends abstractStrategyRouter, Temporary2> registerStrategy() { - log.info("路由节点已通过。。。"); + return param -> { + log.info("路由节点已通过。。。"); // 编写路由规则 for (String code : map.keySet()) { - if(param.containsKey(code)){ + if(ObjectUtils.isNotEmpty(param.get(code))){ param.remove(code); return map.get(code); } diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java index 5067fdf..062adf6 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java @@ -1,5 +1,6 @@ package com.muyu.data.processing.strategy.leaves; +import com.github.yulichang.toolkit.SpringContentUtils; import com.muyu.common.iotdb.config.IotDBSessionConfig; import com.muyu.data.processing.domain.BasicData; import java.util.HashMap; @@ -11,6 +12,7 @@ import com.muyu.data.processing.strategy.core.RoutingStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; import org.springframework.stereotype.Component; /** @@ -27,6 +29,7 @@ import org.springframework.stereotype.Component; @Component public class DataStorageStrategy extends abstractStrategyRouter, Temporary2> implements StrategyHandler, Temporary2> { + private final SessionPool sessionPool = SpringContentUtils.getBean(SessionPool.class); @Override protected StrategyMapper, Temporary2> registerStrategy() { @@ -70,12 +73,12 @@ public class DataStorageStrategy extends abstractStrategyRouter, Temporary2> implements StrategyHandler, Temporary2> { +// private final RedisTemplate redisTemplate = SpringContentUtils.getBean(RedisTemplate.class); + @Override protected StrategyMapper, Temporary2> registerStrategy() { log.info("实时数据处理分支已完成。。。"); @@ -33,11 +37,11 @@ public class RealTimeAlarmStrategy extends abstractStrategyRouter basicDataMap) { log.info("开始执行实时数据处理节点。。。"); - String vin = basicDataMap.get("VIN").getKey(); - basicDataMap.keySet().forEach(key -> { - BasicData basicData = basicDataMap.get(key); - redisService.setCacheObject(vin+":"+basicData.getKey(), basicData.getValue()); - }); +// String vin = basicDataMap.get("VIN").getKey(); +// basicDataMap.keySet().forEach(key -> { +// BasicData basicData = basicDataMap.get(key); +// redisTemplate.opsForValue().set(vin+":"+basicData.getKey(), basicData.getValue()); +// }); return applyStrategy(basicDataMap); } }