diff --git a/JavaSample-tcp1061513671883/.lck b/JavaSample-tcp1061513671883/.lck deleted file mode 100644 index e69de29..0000000 diff --git a/cloud-modules/cloud-modules-event-process/pom.xml b/cloud-modules/cloud-modules-event-process/pom.xml index 0c734d7..4844c85 100644 --- a/cloud-modules/cloud-modules-event-process/pom.xml +++ b/cloud-modules/cloud-modules-event-process/pom.xml @@ -46,24 +46,12 @@ spring-boot-starter-actuator - - - com.mysql - mysql-connector-j - - org.apache.iotdb iotdb-session - - - com.muyu - cloud-common-datasource - - com.muyu diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java index 46b5858..b3d9116 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java @@ -4,6 +4,7 @@ import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableMyFeignClients; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; /** * @Author: zi run @@ -12,7 +13,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; */ @EnableCustomConfig @EnableMyFeignClients -@SpringBootApplication +@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) public class CloudEventProcessApplication { public static void main(String[] args) { SpringApplication.run(CloudEventProcessApplication.class, args); diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java index cd7dbe2..2e1bc55 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java @@ -2,6 +2,7 @@ package com.muyu.event.process.consumer; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; +import com.muyu.common.core.constant.KafkaConstants; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -18,7 +19,7 @@ import java.util.Collection; * @Description 测试消费者 */ @Slf4j -//@Component +@Component @RequiredArgsConstructor public class TestConsumer implements InitializingBean { @@ -27,18 +28,12 @@ public class TestConsumer implements InitializingBean { */ private final KafkaConsumer kafkaConsumer; - /** - * kafka主题名称 - */ - private static final String topicName = "test-topic"; - - @Override public void afterPropertiesSet() throws Exception { new Thread(() -> { - log.info("启动线程监听Topic: {}", topicName); + log.info("启动线程监听Topic: {}", KafkaConstants.MESSAGE_PARSING); ThreadUtil.sleep(1000); - Collection topics = Lists.newArrayList(topicName); + Collection topics = Lists.newArrayList(KafkaConstants.MESSAGE_PARSING); kafkaConsumer.subscribe(topics); while (true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java index 9a688b4..b4e9cd6 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java @@ -1,5 +1,6 @@ package com.muyu.event.process.consumer; +import com.muyu.common.core.constant.KafkaConstants; import com.muyu.event.process.basic.EventPublisher; import com.muyu.event.process.event.IoTDBInsertDataEvent; import lombok.RequiredArgsConstructor; @@ -40,21 +41,21 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener topics = Collections.singletonList(MESSAGE_PARSING); + log.info("启动线程监听Topic: {}", KafkaConstants.MESSAGE_PARSING); + List topics = Collections.singletonList(KafkaConstants.MESSAGE_PARSING); kafkaConsumer.subscribe(topics); while (true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); @@ -62,6 +63,10 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener consumerRecord) { String message = consumerRecord.value(); log.info("接收到车辆报文数据,内容:{}", message); @@ -69,6 +74,10 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener extends ApplicationEvent { + + /** + * 事件携带的数据 + */ + private final T data; + + /** + * 构造函数,初始化事件源和数据 + * + * @param source 事件源对象 + * @param data 事件携带的数据 + */ + public BasicEvent(Object source, T data) { + super(source); + this.data = data; + } + + + /** + * 获取事件携带的数据 + * + * @return 事件数据 + */ + public T getData() { + return data; + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventHandler.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventHandler.java new file mode 100644 index 0000000..d9571d2 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventHandler.java @@ -0,0 +1,37 @@ +package com.muyu.event.process.basic; + +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +/** + * @Author: zi run + * @Date 2024/9/30 15:37 + * @Description 基础事件处理器 + */ +@Component +public class BasicEventHandler implements ApplicationListener> { + + /** + * 具体事件监听器 + */ + private final BasicEventListener listener; + + /** + * 构造函数,用于注入具体事件监听器 + * + * @param listener 具体事件监听器 + */ + public BasicEventHandler(BasicEventListener listener) { + this.listener = listener; + } + + /** + * 处理应用事件 + * + * @param event 事件对象 + */ + @Override + public void onApplicationEvent(BasicEvent event) { + listener.onEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventListener.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventListener.java new file mode 100644 index 0000000..7fecd6e --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventListener.java @@ -0,0 +1,16 @@ +package com.muyu.event.process.basic; + +/** + * @Author: zi run + * @Date 2024/9/30 15:35 + * @Description 基础事件监听器 + */ +public interface BasicEventListener { + + /** + * 处理事件的方法 + * + * @param event 事件对象 + */ + void onEvent(BasicEvent event); +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/EventPublisher.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/EventPublisher.java new file mode 100644 index 0000000..b037cd2 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/EventPublisher.java @@ -0,0 +1,38 @@ +package com.muyu.event.process.basic; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.stereotype.Component; + +/** + * @Author: zi run + * @Date 2024/9/29 22:01 + * @Description 事件发布者 + */ +@Component +public class EventPublisher implements ApplicationEventPublisherAware { + + /** + * 应用程序事件发布者,用于发布事件 + */ + private ApplicationEventPublisher publisher; + + /** + * 设置应用程序事件发布者 + * + * @param publisher 应用程序事件发布者实例 + */ + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { + this.publisher = publisher; + } + + /** + * 发布事件 + * @param event 要发布的事件 + * @param 事件数据类型 + */ + public void publish(BasicEvent event) { + publisher.publishEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/TestConsumer.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/TestConsumer.java new file mode 100644 index 0000000..cd7dbe2 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/TestConsumer.java @@ -0,0 +1,52 @@ +package com.muyu.event.process.consumer; + +import cn.hutool.core.thread.ThreadUtil; +import com.alibaba.nacos.shaded.com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Collection; + +/** + * @Author: zi run + * @Date 2024/9/29 16:53 + * @Description 测试消费者 + */ +@Slf4j +//@Component +@RequiredArgsConstructor +public class TestConsumer implements InitializingBean { + + /** + * kafka消费者 + */ + private final KafkaConsumer kafkaConsumer; + + /** + * kafka主题名称 + */ + private static final String topicName = "test-topic"; + + + @Override + public void afterPropertiesSet() throws Exception { + new Thread(() -> { + log.info("启动线程监听Topic: {}", topicName); + ThreadUtil.sleep(1000); + Collection topics = Lists.newArrayList(topicName); + kafkaConsumer.subscribe(topics); + while (true) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); + consumerRecords.forEach(record -> { + String value = record.value(); + log.info("从Kafka中消费的原始数据: {}", value); + }); + } + }).start(); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/VehicleConsumer.java new file mode 100644 index 0000000..9a688b4 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/VehicleConsumer.java @@ -0,0 +1,93 @@ +package com.muyu.event.process.consumer; + +import com.muyu.event.process.basic.EventPublisher; +import com.muyu.event.process.event.IoTDBInsertDataEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * @Author: zi run + * @Date 2024/9/29 23:23 + * @Description 车辆消费者 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class VehicleConsumer implements ApplicationRunner, ApplicationListener { + + /** + * kafka消费者 + */ + private final KafkaConsumer kafkaConsumer; + + /** + * 事件发布者 + */ + private final EventPublisher eventPublisher; + + /** + * 协议解析报文传递数据(队列名称) + */ + public final static String MESSAGE_PARSING = "MessageParsing"; + + /** + * 设定固定大小的线程池,线程数量与当前可用的处理器核心数相同 + */ + private final ExecutorService executorService = + Executors.newFixedThreadPool(10); + + @Override + public void run(ApplicationArguments args) throws Exception { + log.info("启动线程监听Topic: {}", MESSAGE_PARSING); + List topics = Collections.singletonList(MESSAGE_PARSING); + kafkaConsumer.subscribe(topics); + while (true) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); + consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord))); + } + } + + private void handleRecord(ConsumerRecord consumerRecord) { + String message = consumerRecord.value(); + log.info("接收到车辆报文数据,内容:{}", message); + log.info("------------------------------------------------"); + eventPublisher.publish(new IoTDBInsertDataEvent(this, message)); + } + + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("关闭线程池和Kafka消费者"); + + try { + executorService.shutdown(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + log.error("线程池关闭被中断,强制关闭", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + try { + kafkaConsumer.close(); // 关闭Kafka消费者 + } catch (Exception e) { + log.error("关闭Kafka消费者时发生错误", e); + } + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/controller/TestEventController.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/controller/TestEventController.java new file mode 100644 index 0000000..bfc0dc3 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/controller/TestEventController.java @@ -0,0 +1,88 @@ +package com.muyu.event.process.controller; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.common.core.constant.Constants; +import com.muyu.common.core.domain.Result; +import com.muyu.common.core.web.controller.BaseController; +import com.muyu.event.process.iotdb.service.TestIoTDBService; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/29 16:24 + * @Description 测试事件控制层 + */ +@RestController +@RequiredArgsConstructor +@RequestMapping(value = "/test-event") +public class TestEventController extends BaseController { + + /** + * kafka生产者 + */ + private final KafkaProducer kafkaProducer; + + /** + * kafka主题名称 + */ + private static final String kafkaTopicName = "test-topic"; + + /** + * 测试IoTDB业务层 + */ + private final TestIoTDBService testIoTDBService; + + /** + * 发送Kafka测试消息 + * + * @return 响应结果 + */ + @GetMapping(value = "/sendKafka") + public Result senKafka() { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id","1"); + jsonObject.put("name","张三"); + jsonObject.put("age","18"); + jsonObject.put("sex","男"); + ProducerRecord producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString()); + kafkaProducer.send(producerRecord); + return Result.success(null, Constants.SUCCESS_MESSAGE); + } + + /** + * 查询IoTDB数据列表 + * @return 响应结果 + */ + @GetMapping(value = "/list") + public Result>> list() { + return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE); + } + + /** + * 向IoTDB添加数据 + * + * @return 响应结果 + */ + @PostMapping(value = "/save") + public Result save() { + String deviceId = "root.test"; + ArrayList keyList = new ArrayList<>(); + ArrayList valueList = new ArrayList<>(); + keyList.add("car_vin"); + keyList.add("car_name"); + valueList.add("VIN123456"); + valueList.add("宝马"); + testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList); + return Result.success(null, Constants.SUCCESS_MESSAGE); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java new file mode 100644 index 0000000..91d7d6e --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java @@ -0,0 +1,20 @@ +package com.muyu.event.process.event; + +import com.muyu.event.process.basic.BasicEvent; + +/** + * @Author: zi run + * @Date 2024/9/29 21:19 + * @Description 向IoTDB插入数据事件 + */ +public class IoTDBInsertDataEvent extends BasicEvent { + + /** + * 构造函数,向IoTDB插入数据创建事件 + * + * @param messsge 消息 + */ + public IoTDBInsertDataEvent(Object source, String messsge) { + super(source, messsge); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java new file mode 100644 index 0000000..18e7feb --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java @@ -0,0 +1,72 @@ +package com.muyu.event.process.iotdb.basic.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.session.pool.SessionPool; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; +import java.util.List; + +/** + * @Author: zi run + * @Date 2024/9/28 22:41 + * @Description IoTDB会话配置 + */ +@Slf4j +@Component +@Configuration +public class IoTDBSessionConfig { + + @Value("${spring.iotdb.username}") + private String username; + + @Value("${spring.iotdb.password}") + private String password; + + @Value("${spring.iotdb.ip}") + private String ip; + + @Value("${spring.iotdb.port}") + private int port; + + @Value("${spring.iotdb.maxSize}") + private int maxSize; + + /** + * IoTDB会话池 + */ + private static SessionPool sessionPool = null; + + /** + * 获取IoTDB会话对象 + * @return ioTDB会话对象 + */ + public SessionPool getSessionPool() { + if (sessionPool == null) { + sessionPool = new SessionPool(ip, port, username, password, maxSize); + } + return sessionPool; + } + + /** + * 向IoTDB中插入特定设备的记录 + * + * @param deviceId 设备的唯一标识符 + * @param time 记录的时间戳,以毫秒为单位 + * @param measurements 与记录关联的测量名称列表 + * @param values 每个测量对应的值列表。值的顺序必须与测量名称一一对应 + * + * 该方法从会话池中获取一个会话,并尝试将指定的记录插入到 IoTDB 中。 + * 如果插入失败,将记录错误信息,便于后续排查。 + */ + public void insertRecord(String deviceId, long time, List measurements, List values) { + getSessionPool(); + try { + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values); + sessionPool.insertRecord(deviceId, time, measurements, values); + } catch (Exception e) { + log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}", + deviceId, time, measurements, values, e.getMessage()); + } + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/IService.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/IService.java new file mode 100644 index 0000000..fd5d3bf --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/IService.java @@ -0,0 +1,290 @@ +package com.muyu.event.process.iotdb.basic.service; + +import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; +import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO; +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/28 23:37 + * @Description IoTDB基准业务层 + */ +public interface IService { + + /** + * 插入一个 Tablet 对象到 IoTDB 数据库 + * + * @param tablet 要插入的 Tablet 对象,包含待写入的数据 + */ + void insertTablet(Tablet tablet); + + /** + * 将给定的 Tablets 插入到 IoTDB 数据库中。 + * + * @param tablets 一个 Map,包含要插入的 Tablets + */ + void insertTablets(Map tablets); + + /** + * 单条数据插入(string类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param values 数据项对应值列表 + */ + void insertStringRecord(String deviceId, long time, List measurements, List values); + + /** + * 单条数据插入(不同类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param types 数据项对应类型列表 + * @param values 数据项对应值列表 + */ + void insertRecord(String deviceId, long time, List measurements, + List types, List values); + + /** + * 多个设备多条数据插入(string类型数据项) + * + * @param deviceIds 多个设备名(表名)root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertStringRecords(List deviceIds, List times, + List> measurementsList, List> valuesList); + + /** + * 多个设备多条数据插入(不同类型数据项) + * + * @param deviceIds 多个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertRecords(List deviceIds, List times, List> measurementsList, + List> typesList, List> valuesList); + + /** + * 单个设备多条数据插入(string类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertStringRecordsOfOneDevice(String deviceId, List times, + List> measurementsList, List> valuesList); + + /** + * 单个设备多条数据插入(不同类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, + List> typesList, List> valuesList); + + /** + * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据) + * + * @param path 单个字段 root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ + void deleteData(String path, long endTime); + + /** + * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ + void deleteData(List paths, long endTime); + + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param outTime 超时时间 + * @return SessionDataSet (Time,paths) + */ + SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long outTime); + + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature" + * @param startTime 查询数据的起始时间(包含该时间点) + * @param endTime 查询数据的结束时间(不包含该时间点) + * @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间 + * @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致 + * @param 返回数据的对象类型泛型 + * @return 查询结果的对象列表,如果查询失败则返回 null + */ + List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, + Class extends IoTDbRecordAble> clazz); + + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @return SessionDataSet + */ + SessionDataSet executeLastDataQuery(List paths, long lastTime); + + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 + */ + List executeLastDataQuery(List paths, long lastTime, Class extends IoTDbRecordAble> clazz); + + /** + * 最新点查询(快速查询单设备下指定序列最新点) + * + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @return SessionDataSet + */ + SessionDataSet executeLastDataQueryForOneDevice(String db, String device, + List sensors, boolean isLegalPathNodes); + + /** + * 查询单个设备的最新数据(获取指定设备的最新传感器数据) + * + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 + */ + List executeLastDataQueryForOneDevice(String db, String device, List sensors, + boolean isLegalPathNodes, Class extends IoTDbRecordAble> clazz); + + /** + * 聚合查询 + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @return SessionDataSet + */ + SessionDataSet executeAggregationQuery(List paths, List aggregations); + + /** + * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @return SessionDataSet + */ + SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime); + + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval 查询的时间间隔(单位为毫秒) + * @return SessionDataSet + */ + SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval); + + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval 查询的时间间隔(单位为毫秒) + * @param slidingStep 滑动步长(单位为毫秒) + * @return SessionDataSet + */ + SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval, long slidingStep); + + /** + * SQL查询 + * + * @param sql SQL查询语句,支持IotDB的查询语法 + * @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null + */ + SessionDataSet executeQueryStatement(String sql); + + + /** + * SQL非查询 + * + * @param sql SQL查询语句,支持IotDB的查询语法 + */ + void executeNonQueryStatement(String sql); + + /** + * 封装处理数据 + * + * @param sessionDataSet 包含查询结果的SessionDataSet对象 + * @param titleList 列标题列表,用于映射字段名称 + * @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值 + */ + List> packagingMapData(SessionDataSet sessionDataSet, List titleList); + + /** + * 封装处理数据(不支持聚合查询) + * + * @param sessionDataSet 查询返回的结果集 + * @param titleList 查询返回的结果集内的字段名 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 返回封装后的对象列表,每个对象对应一行结果集数据 + * @param 返回对象的类型 + */ + List packagingObjectData(SessionDataSet sessionDataSet, List titleList, + Class extends IoTDbRecordAble> clazz); + + /** + * 根据对象构建MeasurementSchemas + * + * @param obj 要从中提取字段信息的对象 + * @return 返回一个包含 MeasurementSchema 的列表 + */ + List buildMeasurementSchemas(Object obj); + + /** + * 根据对象构建MeasurementSchemaValuesDTO + * + * @param obj 要从中提取字段信息和对应值的对象 + * @return MeasurementSchemaValuesDTO 对象 + */ + MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj); +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java new file mode 100644 index 0000000..bc355d9 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java @@ -0,0 +1,765 @@ +package com.muyu.event.process.iotdb.basic.service.impl; + +import com.alibaba.fastjson.JSON; +import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig; +import com.muyu.event.process.iotdb.basic.service.IService; +import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; +import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.lang.reflect.Type; +import java.util.*; +import java.util.stream.Collectors; + +/** + * @Author: zi run + * @Date 2024/9/28 23:38 + * @Description IoTDB基准业务实现层 + */ +@Slf4j +@Service +public class ServiceImpl implements IService { + + /** + * IoTDB会话配置 + */ + @Autowired + private IoTDBSessionConfig ioTDBSessionConfig; + + /** + * 插入一个 Tablet 对象到 IoTDB 数据库 + * + * @param tablet 要插入的 Tablet 对象,包含待写入的数据 + */ + @Override + public void insertTablet(Tablet tablet) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:tablet:[{}]", tablet); + sessionPool.insertTablet(tablet); + } catch (Exception e) { + log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage()); + } + } + + /** + * 将给定的 Tablets 插入到 IoTDB 数据库中。 + * + * @param tablets 一个 Map,包含要插入的 Tablets + */ + @Override + public void insertTablets(Map tablets) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:tablets:[{}]", tablets); + sessionPool.insertTablets(tablets); + } catch (Exception e) { + log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage()); + } + } + + /** + * 单条数据插入(string类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param values 数据项对应值列表 + */ + @Override + public void insertStringRecord(String deviceId, long time, List measurements, List values) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values); + sessionPool.insertRecord(deviceId, time, measurements, values); + } catch (Exception e) { + log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}", + deviceId, time, measurements, values, e.getMessage()); + } + } + + /** + * 单条数据插入(不同类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param types 数据项对应类型列表 + * @param values 数据项对应值列表 + */ + @Override + public void insertRecord(String deviceId, long time, List measurements, + List types, List values) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", + deviceId, measurements, types, values); + sessionPool.insertRecord(deviceId, time, measurements, types, values); + } catch (Exception e) { + log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={}, types={}, " + + "values={}, error={}", deviceId, time, measurements, types, values, e.getMessage()); + } + } + + + /** + * 多个设备多条数据插入(string类型数据项) + * + * @param deviceIds 多个设备名(表名)root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertStringRecords(List deviceIds, List times, List> measurementsList, + List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", + deviceIds, measurementsList, valuesList); + sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, " + + "valuesList={}, error={}", deviceIds, times, measurementsList, valuesList, e.getMessage()); + } + } + + /** + * 多个设备多条数据插入(不同类型数据项) + * + * @param deviceIds 多个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertRecords(List deviceIds, List times, List> measurementsList, + List> typesList, List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", + deviceIds, measurementsList, typesList, valuesList); + sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList={}, " + + "valuesList={}, error={}", + deviceIds, times, measurementsList, typesList, valuesList, e.getMessage()); + } + } + + /** + * 单个设备多条数据插入(string类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, + List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", + deviceId, measurementsList, valuesList); + sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, " + + "measurementsList={}, valuesList={}, error={}", + deviceId, times, measurementsList, valuesList, e.getMessage()); + } + } + + /** + * 单个设备多条数据插入(不同类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, + List> typesList, List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", + deviceId, measurementsList, typesList, valuesList); + sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, " + + "measurementsList={},typesList={},valuesList={}, error={}", + deviceId, times, measurementsList, typesList, valuesList, e.getMessage()); + } + } + + /** + * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据) + * + * @param path 单个字段 root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ + @Override + public void deleteData(String path, long endTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据删除:path:[{}], endTime:[{}]", path, endTime); + sessionPool.deleteData(path, endTime); + } catch (Exception e) { + log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage()); + } + } + + /** + * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ + @Override + public void deleteData(List paths, long endTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据删除:paths:[{}], endTime:[{}]", paths, endTime); + sessionPool.deleteData(paths, endTime); + } catch (Exception e) { + log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage()); + } + } + + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param outTime 超时时间 + * @return SessionDataSet (Time,paths) + */ + @Override + public SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long outTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", + paths, startTime, endTime, outTime); + sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " + + "outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature" + * @param startTime 查询数据的起始时间(包含该时间点) + * @param endTime 查询数据的结束时间(不包含该时间点) + * @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间 + * @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致 + * @param 返回数据的对象类型泛型 + * @return 查询结果的对象列表,如果查询失败则返回 null + */ + @Override + public List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, + Class extends IoTDbRecordAble> clazz) { + SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime); + List columnNames = sessionDataSet.getColumnNames(); + List resultEntities = null; + try { + resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); + } catch (Exception e) { + log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " + + "outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage()); + } + return resultEntities; + } + + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @return SessionDataSet + */ + @Override + public SessionDataSet executeLastDataQuery(List paths, long lastTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb数据查询:paths:[{}], lastTime:[{}]", paths, lastTime); + sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", + paths, lastTime, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 + */ + @Override + public List executeLastDataQuery(List paths, long lastTime, Class extends IoTDbRecordAble> clazz) { + SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime); + List columnNames = sessionDataSet.getColumnNames(); + List resultEntities = null; + try { + resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", + paths, lastTime, e.getMessage()); + } + return resultEntities; + } + + /** + * 最新点查询(快速查询单设备下指定序列最新点) + * + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @return SessionDataSet + */ + @Override + public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List sensors, + boolean isLegalPathNodes) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", + db, device, sensors, isLegalPathNodes); + sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}], sensors:[{}], " + + "isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 查询单个设备的最新数据(获取指定设备的最新传感器数据) + * + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 + */ + @Override + public List executeLastDataQueryForOneDevice(String db, String device, List sensors, + boolean isLegalPathNodes, + Class extends IoTDbRecordAble> clazz) { + SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); + List columnNames = sessionDataSet.getColumnNames(); + List resultEntities = null; + try { + resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], " + + "isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); + } + return resultEntities; + } + + /** + * 聚合查询 + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}]", paths, aggregations); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", + paths, aggregations, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", + paths, aggregations, startTime, endTime); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}], " + + "startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval 查询的时间间隔(单位为毫秒) + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", + paths, aggregations, startTime, endTime, interval); + sessionDataSetWrapper = sessionPool.executeAggregationQuery( + paths, aggregations, startTime, endTime, interval + ); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " + + "startTime:[{}], endTime:[{}], interval:[{}], error={}", + paths, aggregations, startTime, endTime, interval, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval 查询的时间间隔(单位为毫秒) + * @param slidingStep 滑动步长(单位为毫秒) + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval, long slidingStep) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], " + + "slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, + interval, slidingStep); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " + + "startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", + paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * SQL查询 + * + * @param sql SQL查询语句,支持IotDB的查询语法 + * @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null + */ + @Override + public SessionDataSet executeQueryStatement(String sql) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + + try { + log.info("iotdb SQL查询:sql:[{}]", sql); + sessionDataSetWrapper = sessionPool.executeQueryStatement(sql); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * SQL非查询 + * + * @param sql SQL查询语句,支持IotDB的查询语法 + */ + @Override + public void executeNonQueryStatement(String sql) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb SQL无查询:sql:[{}]", sql); + sessionPool.executeNonQueryStatement(sql); + } catch (Exception e) { + log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage()); + } + } + + /** + * 封装处理数据 + * + * @param sessionDataSet 包含查询结果的SessionDataSet对象 + * @param titleList 列标题列表,用于映射字段名称 + * @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值 + */ + @SneakyThrows + @Override + public List> packagingMapData(SessionDataSet sessionDataSet, List titleList) { + int fetchSize = sessionDataSet.getFetchSize(); + List> resultList = new ArrayList<>(); + titleList.remove("Time"); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + Map resultMap = new HashMap<>(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(next.getTimestamp()); + resultMap.put("time", timeString); + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { + resultMap.put(splitString(titleList.get(i)), null); + } else { + resultMap.put(splitString(titleList.get(i)), + field.getObjectValue(field.getDataType()).toString()); + } + } + resultList.add(resultMap); + } + } + return resultList; + } + + /** + * 封装处理数据(不支持聚合查询) + * + * @param sessionDataSet 查询返回的结果集 + * @param titleList 查询返回的结果集内的字段名 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 返回封装后的对象列表,每个对象对应一行结果集数据 + * @param 返回对象的类型 + */ + @SneakyThrows + @Override + public List packagingObjectData(SessionDataSet sessionDataSet, List titleList, + Class extends IoTDbRecordAble> clazz) { + int fetchSize = sessionDataSet.getFetchSize(); + List resultList = new ArrayList<>(); + titleList.remove("Time"); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + Map resultMap = new HashMap<>(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(next.getTimestamp()); + resultMap.put("time", timeString); + if (titleList.stream().anyMatch(str -> str.contains("."))) { + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + String title = titleList.get(i); + if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { + resultMap.put(splitString(title), null); + } else { + resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString()); + } + } + } else { + Field fieldName = fields.get(0); + Field fieldValue = fields.get(1); + Field fieldDataType = fields.get(2); + if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) { + String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString(); + Object mapValue = convertStringToType( + fieldValue.getObjectValue(fieldValue.getDataType()).toString(), + fieldDataType.getObjectValue(fieldDataType.getDataType()).toString() + ); + resultMap.put(splitString(mapKey), mapValue); + } + } + + String jsonString = JSON.toJSONString(resultMap); + resultList.add(JSON.parseObject(jsonString, (Type) clazz)); + } + } + return resultList; + } + + /** + * 分割获取字段名 + * + * @param str 输入的字符串 + * @return 字段名 + */ + public static String splitString(String str) { + String[] parts = str.split("\\."); + if (parts.length <= 0) { + return str; + } else { + return parts[parts.length - 1]; + } + } + + /** + * 根据数据值和数据类型返回对应数据类型数据 + * + * @param value 数据值 + * @param typeName 数据类型 + * @return 转换后的数据值 + */ + public static Object convertStringToType(String value, String typeName) { + String type = typeName.toLowerCase(); + if (type.isEmpty()) { + return value; + } + if ("boolean".equals(type)) { + return Boolean.parseBoolean(value); + } else if ("double".equals(type)) { + return Double.parseDouble(value); + } else if ("int32".equals(type)) { + return Integer.parseInt(value); + } else if ("int64".equals(type)) { + return Long.parseLong(value); + } else if ("float".equals(type)) { + return Float.parseFloat(value); + } else if ("text".equals(type)) { + return value; + } else { + return value; + } + } + + /** + * 根据对象属性的数据类型返回对应的TSDataType + * + * @param type 属性的数据类型 + * @return TSDataType + */ + public static TSDataType getTsDataTypeByString(String type) { + String typeName = splitString(type).toLowerCase(); + if ("boolean".equals(typeName)) { + return TSDataType.BOOLEAN; + } else if ("double".equals(typeName)) { + return TSDataType.DOUBLE; + } else if ("int".equals(typeName) || "integer".equals(typeName)) { + return TSDataType.INT32; + } else if ("long".equals(typeName)) { + return TSDataType.INT64; + } else if ("float".equals(typeName)) { + return TSDataType.FLOAT; + } else if ("text".equals(typeName)) { + return TSDataType.TEXT; + } else if ("string".equals(typeName)) { + return TSDataType.TEXT; + } else { + return TSDataType.UNKNOWN; + } + } + + /** + * 根据对象构建MeasurementSchemas + * + * @param obj 要从中提取字段信息的对象 + * @return 返回一个包含 MeasurementSchema 的列表 + */ + @Override + public List buildMeasurementSchemas(Object obj) { + java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields(); + List schemaList = Arrays.stream(fields).map(field -> + new MeasurementSchema(field.getName(), + getTsDataTypeByString( + field.getType().getName() + ))). + collect(Collectors.toList()); + return schemaList; + } + + /** + * 根据对象构建MeasurementSchemaValuesDTO + * + * @param obj 要从中提取字段信息和对应值的对象 + * @return MeasurementSchemaValuesDTO 对象 + */ + @SneakyThrows + @Override + public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) { + MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO(); + java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields(); + List schemaList = new ArrayList<>(); + List values = new ArrayList<>(); + List valuesIsNullIndex = new ArrayList<>(); + int valueIndex = 0; + for (java.lang.reflect.Field field : fields) { + MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), + getTsDataTypeByString(field.getType().getName())); + schemaList.add(measurementSchema); + Object value = field.get(obj); + if (value == null) { + valuesIsNullIndex.add(valueIndex); + } + values.add(value); + valueIndex++; + } + measurementSchemaValuesDTO.setSchemaList(schemaList); + measurementSchemaValuesDTO.setValues(values); + return measurementSchemaValuesDTO; + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/DataJSON.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/DataJSON.java new file mode 100644 index 0000000..fcd4aac --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/DataJSON.java @@ -0,0 +1,33 @@ +package com.muyu.event.process.iotdb.domain; + +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @Author: zi run + * @Date 2024/9/29 0:11 + * @Description JSON数据对象 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +@Tag(name = "ionDB数据源对象") +public class DataJSON { + + /** + * 时间戳 + */ + @Schema(name = "时间戳") + private Long timestamp; + + /** + * 车辆JSON数据 + */ + @Schema(name = "车辆JSON数据") + private String datasource; +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/ResultEntity.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/ResultEntity.java new file mode 100644 index 0000000..ad159a6 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/ResultEntity.java @@ -0,0 +1,36 @@ +package com.muyu.event.process.iotdb.domain; + +import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @Author: zi run + * @Date 2024/9/29 0:18 + * @Description 结果实体 + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class ResultEntity extends IoTDbRecordAble { + + /** + * 温度值 + */ + private Float temperature; + + /** + * 硬件标识 + */ + private String hardware; + + /** + * 状态标识 + */ + private Boolean status; + + /** + * 时间 + */ + private String time; + +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/TestDataType.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/TestDataType.java new file mode 100644 index 0000000..eef2853 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/TestDataType.java @@ -0,0 +1,43 @@ +package com.muyu.event.process.iotdb.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Author: zi run + * @Date 2024/9/29 0:22 + * @Description 测试数据类型 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class TestDataType { + + /** + * 温度值 + */ + private Float temperature; + + /** + * 硬件标识 + */ + private String hardware; + + /** + * 状态标识 + */ + private Boolean status; + + /** + * 测试Double类型 + */ + private Double testDouble; + + /** + * 测试Long类型 + */ + private Long testLong; +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java new file mode 100644 index 0000000..15b6cc2 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java @@ -0,0 +1,67 @@ +package com.muyu.event.process.iotdb.domain.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * @Author: zi run + * @Date 2024/9/29 0:12 + * @Description 插入数据 数据转换对象 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class InsertDataDTO { + + /** + * 温度值 + */ + private Float temperature; + + /** + * 硬件标识 + */ + private String hardware; + + /** + * 状态标识 + */ + private Boolean status; + + /** + * 创建一个单一的InsertDataDTO实例,并设置默认值。 + * + * @return 一个配置好的InsertDataDTO对象 + */ + public InsertDataDTO buildOne() { + InsertDataDTO insertDataDTO = new InsertDataDTO(); + insertDataDTO.setHardware("ss"); + insertDataDTO.setStatus(true); + insertDataDTO.setTemperature(12.0F); + return insertDataDTO; + } + + /** + * 创建一个单一的InsertDataDTO实例,并设置默认值。 + * + * @return 一个配置好的InsertDataDTO对象 + */ + public List buildList() { + List insertDataDTOS = new ArrayList<>(); + int buildNum = 10; + for (int i = 0; i < buildNum; i++) { + InsertDataDTO insertDataDTO = new InsertDataDTO(); + insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null); + insertDataDTO.setStatus(i % 2 == 0); + insertDataDTO.setTemperature(12.0F + i); + insertDataDTOS.add(insertDataDTO); + } + return insertDataDTOS; + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java new file mode 100644 index 0000000..5b8ad62 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java @@ -0,0 +1,12 @@ +package com.muyu.event.process.iotdb.domain.dto; + +import lombok.Data; + +/** + * @Author: zi run + * @Date 2024/9/29 0:23 + * @Description IoTDB数据库记录对象 + */ +@Data +public class IoTDbRecordAble { +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java new file mode 100644 index 0000000..f04a72b --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java @@ -0,0 +1,36 @@ +package com.muyu.event.process.iotdb.domain.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.List; + +/** + * @Author: zi run + * @Date 2024/9/29 0:26 + * @Description 测量模式及其对应的值 数据传输对象 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class MeasurementSchemaValuesDTO { + + /** + * 测量模式列表,每个元素表示一个测量的定义,包括名称、数据类型等信息。 + */ + private List schemaList; + + /** + * 对应于测量模式的实际值列表,存储与 schemaList 中每个测量相对应的值。 + */ + private List values; + + /** + * 存储值为空的索引列表 + */ + private List valueIsNullIndex; +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/IoTDBService.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/IoTDBService.java new file mode 100644 index 0000000..b473acf --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/IoTDBService.java @@ -0,0 +1,11 @@ +package com.muyu.event.process.iotdb.service; + +import com.muyu.event.process.iotdb.basic.service.IService; + +/** + * @Author: zi run + * @Date 2024/9/29 22:38 + * @Description IoTDB业务层 + */ +public interface IoTDBService extends IService { +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java new file mode 100644 index 0000000..84302bd --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java @@ -0,0 +1,20 @@ +package com.muyu.event.process.iotdb.service; + +import com.muyu.event.process.iotdb.basic.service.IService; + +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/29 17:23 + * @Description 测试IoTDB业务层 + */ +public interface TestIoTDBService extends IService { + + /** + * 查询IoTDB数据列表 + * @return 返回结果 + */ + List> list(); +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java new file mode 100644 index 0000000..3167618 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java @@ -0,0 +1,14 @@ +package com.muyu.event.process.iotdb.service.impl; + +import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl; +import com.muyu.event.process.iotdb.service.IoTDBService; +import org.springframework.stereotype.Service; + +/** + * @Author: zi run + * @Date 2024/9/29 22:39 + * @Description IoTDB业务实现层 + */ +@Service +public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService { +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java new file mode 100644 index 0000000..21dde1d --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java @@ -0,0 +1,33 @@ +package com.muyu.event.process.iotdb.service.impl; + +import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl; +import com.muyu.event.process.iotdb.service.TestIoTDBService; +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.isession.SessionDataSet; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/29 17:24 + * @Description 测试IoTDB业务实现层 + */ +@Slf4j +@Service +public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService { + + /** + * 查询IoTDB数据列表 + * @return 返回结果 + */ + @Override + public List> list() { + String sql = "select * from root.test"; + SessionDataSet sessionDataSet = this.executeQueryStatement(sql); + List> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes()); + log.info("查询IoTDB数据为:{}", list.toString()); + return list; + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java new file mode 100644 index 0000000..0c0f96c --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java @@ -0,0 +1,68 @@ +package com.muyu.event.process.listener; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.event.process.basic.BasicEvent; +import com.muyu.event.process.basic.BasicEventListener; +import com.muyu.event.process.event.IoTDBInsertDataEvent; +import com.muyu.event.process.iotdb.service.IoTDBService; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @Author: zi run + * @Date 2024/9/29 22:12 + * @Description 向IoTDB插入数据事件监听器 + */ +@Component +@RequiredArgsConstructor +public class IoTDBInsertDataListener implements BasicEventListener { + + /** + * IoTDB业务层 + */ + private final IoTDBService ioTDBService; + + /** + * 设备名(表名) + */ + private static final String DEVICE_ID = "root.vehicle"; + + /** + * 处理接收到的事件,将数据插入到 IoTDB + * + * @param event 接收到的事件,包含需要插入的数据 + */ + @Override + public void onEvent(BasicEvent event) { + JSONObject data = JSONObject.parseObject(event.getData()); + List keyList = extractKeys(data); + List valueList = extractValues(data); + ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList); + } + + /** + * 从给定的JSONObject中提取所有的键 + * + * @param data 要提取键的JSONObject + * @return 键的列表 + */ + private List extractKeys(JSONObject data) { + return data.keySet().stream().collect(Collectors.toList()); + } + + /** + * 从给定的 JSONObject 中提取所有的值,并将其转换为字符串 + * + * @param data 要提取值的JSONObject + * @return 值的列表,以字符串形式表示 + */ + private List extractValues(JSONObject data) { + return data.values().stream() + .map(Object::toString) + .collect(Collectors.toList()); + } +}
该方法从会话池中获取一个会话,并尝试将指定的记录插入到 IoTDB 中。 + * 如果插入失败,将记录错误信息,便于后续排查。