diff --git a/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBConfig.java b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBSessionConfig.java similarity index 68% rename from cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBConfig.java rename to cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBSessionConfig.java index 23912ae..b83bbad 100644 --- a/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBConfig.java +++ b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBSessionConfig.java @@ -3,6 +3,7 @@ package com.muyu.common.iotdb.config; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.pool.SessionPool; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,7 +19,7 @@ import org.springframework.context.annotation.Configuration; */ @Configuration -public class IotDBConfig { +public class IotDBSessionConfig { @Value("${spring.iotdb.ip}") private String ip; @@ -35,21 +36,18 @@ public class IotDBConfig { @Value("${spring.iotdb.fetchSize}") private int fetchSize; - private static Session session; + private static SessionPool sessionPool; @Bean - public Session iotSession(){ - if (session == null) { - session = new Session(ip, port, user, password, fetchSize); + public SessionPool getSessionPool(){ + if (sessionPool == null) { + sessionPool = new SessionPool(ip, port, user, password, fetchSize); try { - session.open(); - session.setTimeZone("+08:00"); - } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); - } catch (StatementExecutionException e) { + sessionPool.setTimeZone("+08:00"); + } catch (Exception e) { throw new RuntimeException(e); } } - return session; + return sessionPool; } } diff --git a/cloud-common/cloud-common-iotdb/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-iotdb/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 2cabfb0..6ef70f8 100644 --- a/cloud-common/cloud-common-iotdb/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-common/cloud-common-iotdb/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1 @@ -com.muyu.common.iotdb.config.IotDBConfig +com.muyu.common.iotdb.config.IotDBSessionConfig diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java index 60679f1..8ab8328 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java @@ -2,6 +2,7 @@ package com.muyu.data.processing.controller; import com.muyu.common.core.domain.Result; import com.muyu.common.security.utils.SecurityUtils; +import com.muyu.data.processing.domain.BasicData; import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.service.DataProcessingService; @@ -11,6 +12,7 @@ import org.springframework.web.bind.annotation.*; import lombok.extern.slf4j.Slf4j; import java.util.Date; +import java.util.HashMap; import java.util.List; /** @@ -53,5 +55,46 @@ public class DataProcessingController { return Result.success(service.selectCarData(firmCode,vin)); } + @PostMapping("/addCarData") + public Result addCarData(@RequestBody IotDbData data) { + HashMap hashMap = new HashMap<>(); + hashMap.put("timestamp", BasicData + .builder() + .key("timestamp") + .label("时间戳") + .value(String.valueOf(data.getTimestamp())) + .type("string") + .build()); + hashMap.put("vin", BasicData + .builder() + .key("vin") + .label("VIN码") + .value(data.getVin()) + .type("string") + .build()); + hashMap.put("latitude", BasicData + .builder() + .key("latitude") + .label("纬度") + .value(data.getLatitude()) + .type("long") + .build()); + hashMap.put("longitude", BasicData + .builder() + .key("longitude") + .label("经度") + .value(data.getLongitude()) + .type("long") + .build()); + hashMap.put("firmCode", BasicData + .builder() + .key("firmCode") + .label("企业编码") + .value("firm01") + .type("string") + .build()); + return Result.success(service.addCarData(hashMap)); + } + } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index 24bf4d9..762975f 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -4,7 +4,7 @@ package com.muyu.data.processing.controller; import com.github.benmanes.caffeine.cache.Caffeine; import com.muyu.common.caffeine.enums.CacheNameEnums; import com.muyu.common.core.utils.uuid.UUID; -import com.muyu.common.iotdb.config.IotDBConfig; +import com.muyu.common.iotdb.config.IotDBSessionConfig; import com.muyu.common.kafka.constants.KafkaConstants; import com.muyu.common.rabbit.constants.RabbitConstants; import jakarta.annotation.Resource; @@ -12,7 +12,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.cache.Cache; -import org.springframework.cache.CacheManager; import org.springframework.cache.caffeine.CaffeineCache; import org.springframework.cache.support.SimpleCacheManager; import org.springframework.data.redis.core.RedisTemplate; @@ -38,7 +37,7 @@ public class TestController { @Resource private RabbitTemplate rabbitTemplate; @Resource - private IotDBConfig iotDBConfig; + private IotDBSessionConfig iotDBSessionConfig; @Resource private RedisTemplate redisTemplate; // @Resource @@ -101,7 +100,7 @@ public class TestController { @GetMapping("/insertData") public void insertData(@RequestParam("deviceId") String deviceId, @RequestParam("time") long time, @RequestParam("value") double value) throws Exception { String sql = String.format("insert into root.one.%s(timestamp, temperature) values (%d, %f)", deviceId, time, value); - iotDBConfig.iotSession().executeNonQueryStatement(sql); + iotDBSessionConfig.getSessionPool().executeNonQueryStatement(sql); } @GetMapping("/testSetRedis") diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java index 65cb73b..67cacf3 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java @@ -4,6 +4,7 @@ package com.muyu.data.processing.service; import com.muyu.data.processing.domain.BasicData; import com.muyu.data.processing.domain.CarData; +import java.util.HashMap; import java.util.List; /** @@ -30,4 +31,6 @@ public interface DataProcessingService{ Integer insIotDbData(String key, String value); List selectCarData(String firmCode, String vin); + + Object addCarData(HashMap hashMap); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java index 5f3b23c..001ba6e 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -3,13 +3,15 @@ package com.muyu.data.processing.service.impl; import javax.annotation.Resource; -import com.muyu.common.iotdb.config.IotDBConfig; +import com.muyu.common.iotdb.config.IotDBSessionConfig; import com.muyu.data.processing.domain.CarData; import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.domain.BasicData; -import com.muyu.data.processing.strategy.core.StartStrategy; -import org.apache.iotdb.isession.SessionDataSet;import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.springframework.stereotype.Service; @@ -37,7 +39,7 @@ public class DataProcessingServiceImpl implements DataProcessingService { @Resource private DataProcessingMapper mapper; @Resource - private IotDBConfig iotDBConfig; + private SessionPool sessionPool; @Override @@ -73,10 +75,10 @@ public class DataProcessingServiceImpl implements DataProcessingService { ArrayList carDataList = new ArrayList<>(); String sql = "select * from root.one."+firmCode+"."+vin; try { - SessionDataSet sessionDataSet = iotDBConfig.iotSession().executeQueryStatement(sql); - List columnNames = sessionDataSet.getColumnNames(); - while (sessionDataSet.hasNext()){ - RowRecord next = sessionDataSet.next(); + SessionDataSetWrapper dataSetWrapper = sessionPool.executeQueryStatement(sql); + List columnNames = dataSetWrapper.getColumnNames(); + while (dataSetWrapper.hasNext()){ + RowRecord next = dataSetWrapper.next(); CarData data = getCarData(vin, next, columnNames); carDataList.add(data); } @@ -86,6 +88,43 @@ public class DataProcessingServiceImpl implements DataProcessingService { return carDataList; } + @Override + public Object addCarData(HashMap hashMap) { + StringBuilder sql = new StringBuilder("insert into root.one."); + sql.append(hashMap.get("firmCode").getValue()) + .append(".") + .append(hashMap.get("vin").getValue()) + .append("("); + hashMap.remove("firmCode"); + hashMap.remove("vin"); + StringBuilder keys = new StringBuilder(); + StringBuilder values = new StringBuilder(); + hashMap.keySet().forEach(key -> { + keys.append(key).append(","); + if ("String".equals(hashMap.get(key).getType())) { + values.append("'") + .append(hashMap.get(key).getValue()) + .append("'") + .append(","); + }else { + values.append(hashMap.get(key).getValue()) + .append(","); + } + }); + sql.append(keys.substring(0, keys.length() - 1)) + .append(") values (") + .append(values.substring(0, values.length() - 1)) + .append(")"); + try { + sessionPool.executeNonQueryStatement(sql.toString()); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + return sql; + } + private static CarData getCarData(String vin, RowRecord next, List columnNames) { List fields = next.getFields(); CarData data = new CarData(); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java index f6f45e0..3072a47 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java @@ -2,9 +2,12 @@ package com.muyu.data.processing.strategy.branch; import com.muyu.data.processing.domain.BasicData; import java.util.HashMap; +import java.util.List; + import com.muyu.data.processing.domain.Temporary2; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.core.EndStrategy; import com.muyu.data.processing.strategy.leaves.DataStorageStrategy; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -23,14 +26,38 @@ import org.springframework.stereotype.Component; @Component public class DataStorageProcessStrategy extends abstractStrategyRouter, Temporary2> implements StrategyHandler, Temporary2> { + // 必要参数 + private final static HashMap NECESSARY_PARAM = new HashMap<>(); + static { + NECESSARY_PARAM.put("vin","VIN码"); + NECESSARY_PARAM.put("timestamp","时间戳"); + NECESSARY_PARAM.put("longitude","经度"); + NECESSARY_PARAM.put("latitude","纬度"); + } + @Override protected StrategyMapper, Temporary2> registerStrategy() { - return param-> new DataStorageStrategy(); + return param-> { + // 判断是否存在问题 + if (!param.containsKey("DataStorageProcessStrategy")) { + log.error("持久化流程错误,缺少必要参数: {}", param.get("DataStorageProcessStrategy").getKey()); + param.remove("DataStorageProcessStrategy"); + return new EndStrategy(); + } + log.info("持久化数据处理节点已通过。。。"); + return new DataStorageStrategy(); + }; } @Override public Temporary2 apply(HashMap basicDataMap) { - log.info("持久化数据处理节点已通过。。。"); + log.info("持久化数据处理节点开始处理。。。"); + // 判断是否缺少必要参数,如果有,记录 + NECESSARY_PARAM.keySet().forEach(key->{ + if (!basicDataMap.containsKey(key)) { + basicDataMap.put("DataStorageProcessStrategy", BasicData.builder().key(NECESSARY_PARAM.get(key)).build()); + } + }); return applyStrategy(basicDataMap); } }