test: 测试持久化处理

dev.data.processing
面包骑士 2024-10-06 20:55:15 +08:00
parent 6cb6c8d1fd
commit f10fe5c241
7 changed files with 135 additions and 26 deletions

View File

@ -3,6 +3,7 @@ package com.muyu.common.iotdb.config;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -18,7 +19,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
public class IotDBConfig {
public class IotDBSessionConfig {
@Value("${spring.iotdb.ip}")
private String ip;
@ -35,21 +36,18 @@ public class IotDBConfig {
@Value("${spring.iotdb.fetchSize}")
private int fetchSize;
private static Session session;
private static SessionPool sessionPool;
@Bean
public Session iotSession(){
if (session == null) {
session = new Session(ip, port, user, password, fetchSize);
public SessionPool getSessionPool(){
if (sessionPool == null) {
sessionPool = new SessionPool(ip, port, user, password, fetchSize);
try {
session.open();
session.setTimeZone("+08:00");
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
} catch (StatementExecutionException e) {
sessionPool.setTimeZone("+08:00");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return session;
return sessionPool;
}
}

View File

@ -1 +1 @@
com.muyu.common.iotdb.config.IotDBConfig
com.muyu.common.iotdb.config.IotDBSessionConfig

View File

@ -2,6 +2,7 @@ package com.muyu.data.processing.controller;
import com.muyu.common.core.domain.Result;
import com.muyu.common.security.utils.SecurityUtils;
import com.muyu.data.processing.domain.BasicData;
import com.muyu.data.processing.domain.IotDbData;
import com.muyu.data.processing.service.DataProcessingService;
@ -11,6 +12,7 @@ import org.springframework.web.bind.annotation.*;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
/**
@ -53,5 +55,46 @@ public class DataProcessingController {
return Result.success(service.selectCarData(firmCode,vin));
}
@PostMapping("/addCarData")
public Result addCarData(@RequestBody IotDbData data) {
HashMap<String, BasicData> hashMap = new HashMap<>();
hashMap.put("timestamp", BasicData
.builder()
.key("timestamp")
.label("时间戳")
.value(String.valueOf(data.getTimestamp()))
.type("string")
.build());
hashMap.put("vin", BasicData
.builder()
.key("vin")
.label("VIN码")
.value(data.getVin())
.type("string")
.build());
hashMap.put("latitude", BasicData
.builder()
.key("latitude")
.label("纬度")
.value(data.getLatitude())
.type("long")
.build());
hashMap.put("longitude", BasicData
.builder()
.key("longitude")
.label("经度")
.value(data.getLongitude())
.type("long")
.build());
hashMap.put("firmCode", BasicData
.builder()
.key("firmCode")
.label("企业编码")
.value("firm01")
.type("string")
.build());
return Result.success(service.addCarData(hashMap));
}
}

View File

@ -4,7 +4,7 @@ package com.muyu.data.processing.controller;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.common.caffeine.enums.CacheNameEnums;
import com.muyu.common.core.utils.uuid.UUID;
import com.muyu.common.iotdb.config.IotDBConfig;
import com.muyu.common.iotdb.config.IotDBSessionConfig;
import com.muyu.common.kafka.constants.KafkaConstants;
import com.muyu.common.rabbit.constants.RabbitConstants;
import jakarta.annotation.Resource;
@ -12,7 +12,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.data.redis.core.RedisTemplate;
@ -38,7 +37,7 @@ public class TestController {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private IotDBConfig iotDBConfig;
private IotDBSessionConfig iotDBSessionConfig;
@Resource
private RedisTemplate<String,String> redisTemplate;
// @Resource
@ -101,7 +100,7 @@ public class TestController {
@GetMapping("/insertData")
public void insertData(@RequestParam("deviceId") String deviceId, @RequestParam("time") long time, @RequestParam("value") double value) throws Exception {
String sql = String.format("insert into root.one.%s(timestamp, temperature) values (%d, %f)", deviceId, time, value);
iotDBConfig.iotSession().executeNonQueryStatement(sql);
iotDBSessionConfig.getSessionPool().executeNonQueryStatement(sql);
}
@GetMapping("/testSetRedis")

View File

@ -4,6 +4,7 @@ package com.muyu.data.processing.service;
import com.muyu.data.processing.domain.BasicData;
import com.muyu.data.processing.domain.CarData;
import java.util.HashMap;
import java.util.List;
/**
@ -30,4 +31,6 @@ public interface DataProcessingService{
Integer insIotDbData(String key, String value);
List<CarData> selectCarData(String firmCode, String vin);
Object addCarData(HashMap<String, BasicData> hashMap);
}

View File

@ -3,13 +3,15 @@ package com.muyu.data.processing.service.impl;
import javax.annotation.Resource;
import com.muyu.common.iotdb.config.IotDBConfig;
import com.muyu.common.iotdb.config.IotDBSessionConfig;
import com.muyu.data.processing.domain.CarData;
import com.muyu.data.processing.domain.IotDbData;
import com.muyu.data.processing.domain.BasicData;
import com.muyu.data.processing.strategy.core.StartStrategy;
import org.apache.iotdb.isession.SessionDataSet;import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
@ -37,7 +39,7 @@ public class DataProcessingServiceImpl implements DataProcessingService {
@Resource
private DataProcessingMapper mapper;
@Resource
private IotDBConfig iotDBConfig;
private SessionPool sessionPool;
@Override
@ -73,10 +75,10 @@ public class DataProcessingServiceImpl implements DataProcessingService {
ArrayList<CarData> carDataList = new ArrayList<>();
String sql = "select * from root.one."+firmCode+"."+vin;
try {
SessionDataSet sessionDataSet = iotDBConfig.iotSession().executeQueryStatement(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
while (sessionDataSet.hasNext()){
RowRecord next = sessionDataSet.next();
SessionDataSetWrapper dataSetWrapper = sessionPool.executeQueryStatement(sql);
List<String> columnNames = dataSetWrapper.getColumnNames();
while (dataSetWrapper.hasNext()){
RowRecord next = dataSetWrapper.next();
CarData data = getCarData(vin, next, columnNames);
carDataList.add(data);
}
@ -86,6 +88,43 @@ public class DataProcessingServiceImpl implements DataProcessingService {
return carDataList;
}
@Override
public Object addCarData(HashMap<String, BasicData> hashMap) {
StringBuilder sql = new StringBuilder("insert into root.one.");
sql.append(hashMap.get("firmCode").getValue())
.append(".")
.append(hashMap.get("vin").getValue())
.append("(");
hashMap.remove("firmCode");
hashMap.remove("vin");
StringBuilder keys = new StringBuilder();
StringBuilder values = new StringBuilder();
hashMap.keySet().forEach(key -> {
keys.append(key).append(",");
if ("String".equals(hashMap.get(key).getType())) {
values.append("'")
.append(hashMap.get(key).getValue())
.append("'")
.append(",");
}else {
values.append(hashMap.get(key).getValue())
.append(",");
}
});
sql.append(keys.substring(0, keys.length() - 1))
.append(") values (")
.append(values.substring(0, values.length() - 1))
.append(")");
try {
sessionPool.executeNonQueryStatement(sql.toString());
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
return sql;
}
private static CarData getCarData(String vin, RowRecord next, List<String> columnNames) {
List<Field> fields = next.getFields();
CarData data = new CarData();

View File

@ -2,9 +2,12 @@ package com.muyu.data.processing.strategy.branch;
import com.muyu.data.processing.domain.BasicData;
import java.util.HashMap;
import java.util.List;
import com.muyu.data.processing.domain.Temporary2;
import com.muyu.data.processing.strategy.StrategyHandler;
import com.muyu.data.processing.strategy.abstractStrategyRouter;
import com.muyu.data.processing.strategy.core.EndStrategy;
import com.muyu.data.processing.strategy.leaves.DataStorageStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -23,14 +26,38 @@ import org.springframework.stereotype.Component;
@Component
public class DataStorageProcessStrategy extends abstractStrategyRouter<HashMap<String, BasicData>, Temporary2>
implements StrategyHandler<HashMap<String, BasicData>, Temporary2> {
// 必要参数
private final static HashMap<String,String> NECESSARY_PARAM = new HashMap<>();
static {
NECESSARY_PARAM.put("vin","VIN码");
NECESSARY_PARAM.put("timestamp","时间戳");
NECESSARY_PARAM.put("longitude","经度");
NECESSARY_PARAM.put("latitude","纬度");
}
@Override
protected StrategyMapper<HashMap<String, BasicData>, Temporary2> registerStrategy() {
return param-> new DataStorageStrategy();
return param-> {
// 判断是否存在问题
if (!param.containsKey("DataStorageProcessStrategy")) {
log.error("持久化流程错误,缺少必要参数: {}", param.get("DataStorageProcessStrategy").getKey());
param.remove("DataStorageProcessStrategy");
return new EndStrategy();
}
log.info("持久化数据处理节点已通过。。。");
return new DataStorageStrategy();
};
}
@Override
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
log.info("持久化数据处理节点已通过。。。");
log.info("持久化数据处理节点开始处理。。。");
// 判断是否缺少必要参数,如果有,记录
NECESSARY_PARAM.keySet().forEach(key->{
if (!basicDataMap.containsKey(key)) {
basicDataMap.put("DataStorageProcessStrategy", BasicData.builder().key(NECESSARY_PARAM.get(key)).build());
}
});
return applyStrategy(basicDataMap);
}
}