test(): 完成责任数路由逻辑优化,实现持久化测试
parent
f797c7cae8
commit
1abe99919b
|
@ -1,7 +1,9 @@
|
||||||
package com.muyu.data.processing.strategy;
|
package com.muyu.data.processing.strategy;
|
||||||
|
|
||||||
|
import com.github.yulichang.toolkit.SpringContentUtils;
|
||||||
import com.muyu.common.redis.service.RedisService;
|
import com.muyu.common.redis.service.RedisService;
|
||||||
import com.muyu.data.processing.strategy.core.EndStrategy;
|
import com.muyu.data.processing.strategy.core.EndStrategy;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 策略控制者接口
|
* 策略控制者接口
|
||||||
|
@ -13,7 +15,6 @@ import com.muyu.data.processing.strategy.core.EndStrategy;
|
||||||
*/
|
*/
|
||||||
public interface StrategyHandler<T,R> {
|
public interface StrategyHandler<T,R> {
|
||||||
|
|
||||||
RedisService redisService = new RedisService();
|
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
StrategyHandler DEFAULT = param -> new EndStrategy();
|
StrategyHandler DEFAULT = param -> new EndStrategy();
|
||||||
|
|
|
@ -40,12 +40,10 @@ public class BasicStrategy extends abstractStrategyRouter<HashMap<String, BasicD
|
||||||
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
|
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
|
||||||
log.info("开始执行基础校验节点。。。");
|
log.info("开始执行基础校验节点。。。");
|
||||||
basicDataMap.put(CacheNameEnums.STORAGE.getCode(), null);
|
basicDataMap.put(CacheNameEnums.STORAGE.getCode(), null);
|
||||||
CacheNameEnums.getCodes().forEach(code-> {
|
CacheNameEnums.getCodes()
|
||||||
// 如果缓存信息不为空,则说明车辆需要处理该事件
|
.stream()
|
||||||
if (ObjectUtils.isNotEmpty(cacheUtils.hasKey(code, basicDataMap.get("VIN").getKey()))){
|
.filter(code -> cacheUtils.hasKey(code, basicDataMap.get("VIN").getKey()))
|
||||||
basicDataMap.put(code, null);
|
.forEach(code-> basicDataMap.put(code, null));
|
||||||
}
|
|
||||||
});
|
|
||||||
return applyStrategy(basicDataMap);
|
return applyStrategy(basicDataMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import com.muyu.data.processing.strategy.StrategyHandler;
|
||||||
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
||||||
import com.muyu.data.processing.strategy.branch.*;
|
import com.muyu.data.processing.strategy.branch.*;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -36,11 +37,12 @@ public class RoutingStrategy extends abstractStrategyRouter<HashMap<String, Ba
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
|
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
|
||||||
log.info("路由节点已通过。。。");
|
|
||||||
return param -> {
|
return param -> {
|
||||||
|
log.info("路由节点已通过。。。");
|
||||||
// 编写路由规则
|
// 编写路由规则
|
||||||
for (String code : map.keySet()) {
|
for (String code : map.keySet()) {
|
||||||
if(param.containsKey(code)){
|
if(ObjectUtils.isNotEmpty(param.get(code))){
|
||||||
param.remove(code);
|
param.remove(code);
|
||||||
return map.get(code);
|
return map.get(code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.muyu.data.processing.strategy.leaves;
|
package com.muyu.data.processing.strategy.leaves;
|
||||||
|
|
||||||
|
import com.github.yulichang.toolkit.SpringContentUtils;
|
||||||
import com.muyu.common.iotdb.config.IotDBSessionConfig;
|
import com.muyu.common.iotdb.config.IotDBSessionConfig;
|
||||||
import com.muyu.data.processing.domain.BasicData;
|
import com.muyu.data.processing.domain.BasicData;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -11,6 +12,7 @@ import com.muyu.data.processing.strategy.core.RoutingStrategy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||||
|
import org.apache.iotdb.session.pool.SessionPool;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -27,6 +29,7 @@ import org.springframework.stereotype.Component;
|
||||||
@Component
|
@Component
|
||||||
public class DataStorageStrategy extends abstractStrategyRouter<HashMap<String, BasicData>, Temporary2>
|
public class DataStorageStrategy extends abstractStrategyRouter<HashMap<String, BasicData>, Temporary2>
|
||||||
implements StrategyHandler<HashMap<String, BasicData>, Temporary2> {
|
implements StrategyHandler<HashMap<String, BasicData>, Temporary2> {
|
||||||
|
private final SessionPool sessionPool = SpringContentUtils.getBean(SessionPool.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
|
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
|
||||||
|
@ -70,12 +73,12 @@ public class DataStorageStrategy extends abstractStrategyRouter<HashMap<String,
|
||||||
.append(values.substring(0, values.length() - 1))
|
.append(values.substring(0, values.length() - 1))
|
||||||
.append(")");
|
.append(")");
|
||||||
try {
|
try {
|
||||||
|
log.info("拼接sql语句: [{}]", sql);
|
||||||
new IotDBSessionConfig().getSessionPool().executeNonQueryStatement(sql.toString());
|
new IotDBSessionConfig().getSessionPool().executeNonQueryStatement(sql.toString());
|
||||||
} catch (StatementExecutionException e) {
|
} catch (StatementExecutionException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} catch (IoTDBConnectionException e) {
|
} catch (IoTDBConnectionException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
log.info("成功执行sql语句: [{}]", sql);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.muyu.data.processing.strategy.leaves;
|
package com.muyu.data.processing.strategy.leaves;
|
||||||
|
|
||||||
|
import com.github.yulichang.toolkit.SpringContentUtils;
|
||||||
import com.muyu.data.processing.domain.BasicData;
|
import com.muyu.data.processing.domain.BasicData;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
@ -8,6 +9,7 @@ import com.muyu.data.processing.strategy.StrategyHandler;
|
||||||
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
||||||
import com.muyu.data.processing.strategy.core.RoutingStrategy;
|
import com.muyu.data.processing.strategy.core.RoutingStrategy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -24,6 +26,8 @@ import org.springframework.stereotype.Component;
|
||||||
@Component
|
@Component
|
||||||
public class RealTimeAlarmStrategy extends abstractStrategyRouter<HashMap<String, BasicData>, Temporary2>
|
public class RealTimeAlarmStrategy extends abstractStrategyRouter<HashMap<String, BasicData>, Temporary2>
|
||||||
implements StrategyHandler<HashMap<String, BasicData>, Temporary2> {
|
implements StrategyHandler<HashMap<String, BasicData>, Temporary2> {
|
||||||
|
// private final RedisTemplate<String,String> redisTemplate = SpringContentUtils.getBean(RedisTemplate.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
|
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
|
||||||
log.info("实时数据处理分支已完成。。。");
|
log.info("实时数据处理分支已完成。。。");
|
||||||
|
@ -33,11 +37,11 @@ public class RealTimeAlarmStrategy extends abstractStrategyRouter<HashMap<String
|
||||||
@Override
|
@Override
|
||||||
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
|
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
|
||||||
log.info("开始执行实时数据处理节点。。。");
|
log.info("开始执行实时数据处理节点。。。");
|
||||||
String vin = basicDataMap.get("VIN").getKey();
|
// String vin = basicDataMap.get("VIN").getKey();
|
||||||
basicDataMap.keySet().forEach(key -> {
|
// basicDataMap.keySet().forEach(key -> {
|
||||||
BasicData basicData = basicDataMap.get(key);
|
// BasicData basicData = basicDataMap.get(key);
|
||||||
redisService.setCacheObject(vin+":"+basicData.getKey(), basicData.getValue());
|
// redisTemplate.opsForValue().set(vin+":"+basicData.getKey(), basicData.getValue());
|
||||||
});
|
// });
|
||||||
return applyStrategy(basicDataMap);
|
return applyStrategy(basicDataMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue