feat: 完成kafka消费到数据处理的数据持久化

dev.data.processing
面包骑士 2024-10-07 11:34:36 +08:00
parent f10fe5c241
commit c98804b1f4
5 changed files with 102 additions and 43 deletions

View File

@ -55,21 +55,21 @@ public class TestController {
"key": "vin", "key": "vin",
"label": "VIN码", "label": "VIN码",
"type": "String", "type": "String",
"value": "vin131413534474" "value": "vin999999"
},{ },{
"key": "timestamp", "key": "timestamp",
"label": "时间戳", "label": "时间戳",
"type": "String", "type": "long",
"value": "1727534036893" "value": "1727534036893"
},{ },{
"key": "latitude", "key": "latitude",
"label": "纬度", "label": "纬度",
"type": "String", "type": "int",
"value": "66.898" "value": "66.898"
},{ },{
"key": "longitude", "key": "longitude",
"label": "经度", "label": "经度",
"type": "String", "type": "int",
"value": "99.12" "value": "99.12"
}]"""; }]""";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);

View File

@ -90,39 +90,43 @@ public class DataProcessingServiceImpl implements DataProcessingService {
@Override @Override
public Object addCarData(HashMap<String, BasicData> hashMap) { public Object addCarData(HashMap<String, BasicData> hashMap) {
StringBuilder sql = new StringBuilder("insert into root.one."); // StringBuilder sql = new StringBuilder("insert into root.one.");
sql.append(hashMap.get("firmCode").getValue()) // sql.append(hashMap.get("firmCode").getValue())
.append(".") // .append(".")
.append(hashMap.get("vin").getValue()) // .append(hashMap.get("vin").getValue())
.append("("); // .append("(");
hashMap.remove("firmCode"); // hashMap.remove("firmCode");
hashMap.remove("vin"); // hashMap.remove("vin");
StringBuilder keys = new StringBuilder(); // StringBuilder keys = new StringBuilder();
StringBuilder values = new StringBuilder(); // StringBuilder values = new StringBuilder();
hashMap.keySet().forEach(key -> { // hashMap.keySet().forEach(key -> {
keys.append(key).append(","); // if (hashMap.get(key) != null) {
if ("String".equals(hashMap.get(key).getType())) { // keys.append(key).append(",");
values.append("'") // if ("String".equals(hashMap.get(key).getType())) {
.append(hashMap.get(key).getValue()) // values.append("'")
.append("'") // .append(hashMap.get(key).getValue())
.append(","); // .append("'")
}else { // .append(",");
values.append(hashMap.get(key).getValue()) // }else {
.append(","); // values.append(hashMap.get(key).getValue())
} // .append(",");
}); // }
sql.append(keys.substring(0, keys.length() - 1)) // }
.append(") values (") // });
.append(values.substring(0, values.length() - 1)) // sql.append(keys.substring(0, keys.length() - 1))
.append(")"); // .append(") values (")
try { // .append(values.substring(0, values.length() - 1))
sessionPool.executeNonQueryStatement(sql.toString()); // .append(")");
} catch (StatementExecutionException e) { // try {
throw new RuntimeException(e); // sessionPool.executeNonQueryStatement(sql.toString());
} catch (IoTDBConnectionException e) { // } catch (StatementExecutionException e) {
throw new RuntimeException(e); // throw new RuntimeException(e);
} // } catch (IoTDBConnectionException e) {
return sql; // throw new RuntimeException(e);
// }
// log.info("成功执行sql语句: [{}]", sql);
// return sql;
return null;
} }
private static CarData getCarData(String vin, RowRecord next, List<String> columnNames) { private static CarData getCarData(String vin, RowRecord next, List<String> columnNames) {

View File

@ -29,7 +29,7 @@ public class DataStorageProcessStrategy extends abstractStrategyRouter<HashMap<S
// 必要参数 // 必要参数
private final static HashMap<String,String> NECESSARY_PARAM = new HashMap<>(); private final static HashMap<String,String> NECESSARY_PARAM = new HashMap<>();
static { static {
NECESSARY_PARAM.put("vin","VIN码"); NECESSARY_PARAM.put("VIN","VIN码");
NECESSARY_PARAM.put("timestamp","时间戳"); NECESSARY_PARAM.put("timestamp","时间戳");
NECESSARY_PARAM.put("longitude","经度"); NECESSARY_PARAM.put("longitude","经度");
NECESSARY_PARAM.put("latitude","纬度"); NECESSARY_PARAM.put("latitude","纬度");
@ -39,7 +39,7 @@ public class DataStorageProcessStrategy extends abstractStrategyRouter<HashMap<S
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() { protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
return param-> { return param-> {
// 判断是否存在问题 // 判断是否存在问题
if (!param.containsKey("DataStorageProcessStrategy")) { if (param.containsKey("DataStorageProcessStrategy")) {
log.error("持久化流程错误,缺少必要参数: {}", param.get("DataStorageProcessStrategy").getKey()); log.error("持久化流程错误,缺少必要参数: {}", param.get("DataStorageProcessStrategy").getKey());
param.remove("DataStorageProcessStrategy"); param.remove("DataStorageProcessStrategy");
return new EndStrategy(); return new EndStrategy();

View File

@ -32,7 +32,7 @@ public class BasicStrategy extends abstractStrategyRouter<HashMap<String, BasicD
@Override @Override
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) { public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
log.info("开始执行基础校验节点。。。"); log.info("开始执行基础校验节点。。。");
basicDataMap.put(CacheNameEnums.WARMING.getCode(), null); basicDataMap.put(CacheNameEnums.STORAGE.getCode(), null);
basicDataMap.put(CacheNameEnums.FAULT.getCode(), null); basicDataMap.put(CacheNameEnums.FAULT.getCode(), null);
basicDataMap.put(CacheNameEnums.REALTIME.getCode(), null); basicDataMap.put(CacheNameEnums.REALTIME.getCode(), null);
log.info("基础校验节点已通过。。。"); log.info("基础校验节点已通过。。。");

View File

@ -1,13 +1,23 @@
package com.muyu.data.processing.strategy.leaves; package com.muyu.data.processing.strategy.leaves;
import com.muyu.common.caffeine.enums.CacheNameEnums;import com.muyu.data.processing.domain.BasicData; import com.muyu.common.caffeine.enums.CacheNameEnums;
import com.muyu.common.iotdb.config.IotDBSessionConfig;
import com.muyu.data.processing.domain.BasicData;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.domain.Temporary2;
import com.muyu.data.processing.service.DataProcessingService;
import com.muyu.data.processing.service.impl.DataProcessingServiceImpl;
import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.StrategyHandler;
import com.muyu.data.processing.strategy.abstractStrategyRouter; import com.muyu.data.processing.strategy.abstractStrategyRouter;
import com.muyu.data.processing.strategy.core.RoutingStrategy; import com.muyu.data.processing.strategy.core.RoutingStrategy;
import jakarta.annotation.Resource;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@ -24,15 +34,60 @@ import org.springframework.stereotype.Component;
@Component @Component
public class DataStorageStrategy extends abstractStrategyRouter<HashMap<String, BasicData>, Temporary2> public class DataStorageStrategy extends abstractStrategyRouter<HashMap<String, BasicData>, Temporary2>
implements StrategyHandler<HashMap<String, BasicData>, Temporary2> { implements StrategyHandler<HashMap<String, BasicData>, Temporary2> {
private SessionPool sessionPool = new IotDBSessionConfig().getSessionPool();
@Override @Override
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() { protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
return param -> new RoutingStrategy(); return param -> {
log.info("数据持久化节点已通过。。。");
return new RoutingStrategy();
};
} }
@Override @Override
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) { public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
log.info("数据持久化节点已通过。。。");
basicDataMap.remove(CacheNameEnums.STORAGE.getCode()); basicDataMap.remove(CacheNameEnums.STORAGE.getCode());
// 执行持久化方法
addCarData(basicDataMap);
return applyStrategy(basicDataMap); return applyStrategy(basicDataMap);
} }
private void addCarData(HashMap<String, BasicData> hashMap) {
StringBuilder sql = new StringBuilder("insert into root.one.");
sql.append(hashMap.get("firmCode").getValue())
.append(".")
.append(hashMap.get("VIN").getValue())
.append("(");
hashMap.remove("firmCode");
hashMap.remove("VIN");
StringBuilder keys = new StringBuilder();
StringBuilder values = new StringBuilder();
hashMap.keySet().forEach(key -> {
if (hashMap.get(key) != null) {
keys.append(key).append(",");
if ("String".equals(hashMap.get(key).getType())) {
values.append("'")
.append(hashMap.get(key).getValue())
.append("'")
.append(",");
}else {
values.append(hashMap.get(key).getValue())
.append(",");
}
}
});
sql.append(keys.substring(0, keys.length() - 1))
.append(") values (")
.append(values.substring(0, values.length() - 1))
.append(")");
try {
sessionPool.executeNonQueryStatement(sql.toString());
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
log.info("成功执行sql语句: [{}]", sql);
}
} }