From fcb6f81a12c52b7e84f231c0eccdbaf832187e31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=91=E5=B9=B4=E6=A2=A6=E4=B8=8E=E7=A0=96?= <2847127106@qq.com> Date: Fri, 4 Oct 2024 15:13:03 +0800 Subject: [PATCH] =?UTF-8?q?test:=20=E5=9F=BA=E7=A1=80=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/caffeine/enums/CacheNameEnums.java | 6 ++--- .../muyu/common/iotdb/config/IotDBConfig.java | 17 +++++++++---- .../kafka/constants/KafkaConstants.java | 4 +-- .../controller/DataProcessingController.java | 10 +++++++- .../processing/controller/TestController.java | 2 +- .../muyu/data/processing/domain/CarData.java | 25 +++++++++++++++++++ .../mapper/DataProcessingMapper.java | 3 +++ .../rebbit/DownlineRabbitConsumer.java | 1 + .../rebbit/GoOnlineRabbitConsumer.java | 6 ++--- .../service/DataProcessingService.java | 3 +++ .../impl/DataProcessingServiceImpl.java | 21 +++++++++++++++- .../strategy/core/BasicStrategy.java | 6 ++--- .../processing/DataProcessingMapper.xml | 5 +++- 13 files changed, 88 insertions(+), 21 deletions(-) create mode 100644 cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java index 87c10c8..3d3116b 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java @@ -17,11 +17,11 @@ import java.util.List; @Getter public enum CacheNameEnums { + STORAGE("storage", "持久化"), FAULT("fault", "故障"), FENCE("fence", "围栏"), - STORAGE("storage", "持久化"), - REALTIME("realTime", "实时信息"), - WARMING("warming", "预警"); + WARMING("warming", "预警"), + REALTIME("realTime", "实时信息"); private final String code; private final String info; diff --git a/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBConfig.java b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBConfig.java index 76bfa49..23912ae 100644 --- a/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBConfig.java +++ b/cloud-common/cloud-common-iotdb/src/main/java/com/muyu/common/iotdb/config/IotDBConfig.java @@ -1,6 +1,7 @@ package com.muyu.common.iotdb.config; import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -34,13 +35,19 @@ public class IotDBConfig { @Value("${spring.iotdb.fetchSize}") private int fetchSize; + private static Session session; @Bean public Session iotSession(){ - Session session = new Session(ip, port, user, password, fetchSize); - try { - session.open(); - } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); + if (session == null) { + session = new Session(ip, port, user, password, fetchSize); + try { + session.open(); + session.setTimeZone("+08:00"); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } } return session; } diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java index 071a7c5..d4c3d13 100644 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java @@ -8,7 +8,7 @@ package com.muyu.common.kafka.constants; */ public class KafkaConstants { - public final static String KafkaTopic = "kafka_topic2"; + public final static String KafkaTopic = "kafka_topic"; - public final static String KafkaGrop = "kafka_grop2"; + public final static String KafkaGrop = "kafka_grop"; } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java index 657d84a..60679f1 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java @@ -1,6 +1,7 @@ package com.muyu.data.processing.controller; import com.muyu.common.core.domain.Result; +import com.muyu.common.security.utils.SecurityUtils; import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.service.DataProcessingService; @@ -34,7 +35,7 @@ public class DataProcessingController { * * @return */ - @RequestMapping(value = "/selectStorageGroup", method = RequestMethod.GET) + @GetMapping("/selectStorageGroup") public Result selectStorageGroup() { List v = service.selectStorageGroup();if (v.size() > 0) {v.forEach(x -> { System.out.println("group------------------" + x.toString()); @@ -45,5 +46,12 @@ public class DataProcessingController { } } + @GetMapping("/selectCarData") + public Result selectCarData(@RequestParam("vin") String vin) { +// String firmCode = SecurityUtils.getSaasKey(); + String firmCode = "firm01"; + return Result.success(service.selectCarData(firmCode,vin)); + } + } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index dec37dc..b4aad1f 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -43,7 +43,7 @@ public class TestController { private CacheManager cacheManager; @GetMapping("/testKafka") - public void sendMsg(@RequestParam("msg") String msg) { + public void sendMsg() { try { // 测试数据 String jsonString = """ diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java new file mode 100644 index 0000000..bc275a0 --- /dev/null +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java @@ -0,0 +1,25 @@ +package com.muyu.data.processing.domain; + +import lombok.*; + +/** + * 车辆信息 + * + * @Author: 胡杨 + * @Name: CarData + * @Description: 车辆信息 + * @CreatedDate: 2024/10/2 下午2:34 + * @FilePath: com.muyu.data.processing.domain + */ + +@Data +@Builder +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class CarData { + private String vin; + private String timestamp; + private String latitude; + private String longitude; +} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java index c54390a..fdd1cfe 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java @@ -1,5 +1,6 @@ package com.muyu.data.processing.mapper; +import com.muyu.data.processing.domain.CarData; import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.domain.BasicData; import org.apache.ibatis.annotations.Mapper; @@ -29,4 +30,6 @@ public interface DataProcessingMapper{ void strategyCheck(@Param("dataList") List dataList); Integer insIotDbDataVo(IotDbData build); + + List selectCarData(@Param("tableName") String tableName); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java index c26966c..c3a38b6 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java @@ -1,6 +1,7 @@ package com.muyu.data.processing.rebbit; +import com.muyu.common.caffeine.enums.CacheNameEnums; import com.muyu.common.rabbit.constants.RabbitConstants; import com.rabbitmq.client.Channel; import jakarta.annotation.Resource; diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java index 874e325..05bae21 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java @@ -1,6 +1,7 @@ package com.muyu.data.processing.rebbit; +import com.muyu.common.caffeine.enums.CacheNameEnums; import com.muyu.common.rabbit.constants.RabbitConstants; import com.rabbitmq.client.Channel; import jakarta.annotation.Resource; @@ -34,9 +35,6 @@ public class GoOnlineRabbitConsumer { @Resource private CacheManager cacheManager; - @Value("#{'${cacheNames}'.split(',')}") - private List cacheNames; - @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)}) public void goOnline(String vin, Message message, Channel channel){ log.info("车辆 {} 上线, 配置信息准备中。。。",vin); @@ -66,7 +64,7 @@ public class GoOnlineRabbitConsumer { */ public void addCarCache(String vin) { // 从Redis中获取缓存信息 - for (String name : cacheNames) { + for (String name : CacheNameEnums.getCodes()) { String value = redisTemplate.opsForValue().get(name+":"+vin); cacheManager.getCache(name).put(vin, value); log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java index 7876553..65cb73b 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java @@ -2,6 +2,7 @@ package com.muyu.data.processing.service; import com.muyu.data.processing.domain.BasicData; +import com.muyu.data.processing.domain.CarData; import java.util.List; @@ -27,4 +28,6 @@ public interface DataProcessingService{ void strategyCheck(List dataList); Integer insIotDbData(String key, String value); + + List selectCarData(String firmCode, String vin); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java index f2c9bb0..0f9da8e 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -3,9 +3,13 @@ package com.muyu.data.processing.service.impl; import javax.annotation.Resource; +import com.muyu.common.iotdb.config.IotDBConfig; +import com.muyu.data.processing.domain.CarData; import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.domain.BasicData; import com.muyu.data.processing.strategy.core.StartStrategy; +import org.apache.iotdb.isession.SessionDataSet;import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; import com.muyu.data.processing.mapper.DataProcessingMapper; @@ -30,7 +34,7 @@ public class DataProcessingServiceImpl implements DataProcessingService { @Resource private DataProcessingMapper mapper; @Resource - private StartStrategy rootStrategy; + private IotDBConfig iotDBConfig; @Override @@ -61,5 +65,20 @@ public class DataProcessingServiceImpl implements DataProcessingService { return mapper.insIotDbData(key, value); } + @Override + public List selectCarData(String firmCode, String vin) { + String tableName = "root.one."+firmCode+"."+vin; + try { + SessionDataSet sessionDataSet = iotDBConfig.iotSession().executeQueryStatement(tableName); + List columnNames = sessionDataSet.getColumnNames(); + System.out.println(columnNames); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + return null; + } + } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java index b9d7f2c..7ca35a7 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java @@ -32,9 +32,9 @@ public class BasicStrategy extends abstractStrategyRouter basicDataMap) { log.info("开始执行基础校验节点。。。"); - basicDataMap.put(CacheNameEnums.WARMING.getCode(), BasicData.builder().key("test").build()); - basicDataMap.put(CacheNameEnums.FAULT.getCode(), BasicData.builder().key("test").build()); - basicDataMap.put(CacheNameEnums.REALTIME.getCode(), BasicData.builder().key("test").build()); + basicDataMap.put(CacheNameEnums.WARMING.getCode(), null); + basicDataMap.put(CacheNameEnums.FAULT.getCode(), null); + basicDataMap.put(CacheNameEnums.REALTIME.getCode(), null); log.info("基础校验节点已通过。。。"); return applyStrategy(basicDataMap); } diff --git a/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml b/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml index 2373a13..7766db8 100644 --- a/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml +++ b/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml @@ -6,6 +6,10 @@ show storage group + + insert into root.one.data(${key}) values(${value}); @@ -24,7 +28,6 @@ - insert into root.one.data (timestamp, vin, latitude,longitude)