From 7100bf71bfcd03208d97a5d4d53b8dc7d9ab1cbf Mon Sep 17 00:00:00 2001 From: xinzirun Date: Mon, 30 Sep 2024 00:52:20 +0800 Subject: [PATCH] =?UTF-8?q?feat():=20=E6=B7=BB=E5=8A=A0=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=E4=B8=9A=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cloud-modules-event-process/pom.xml | 12 ++- .../muyu/event/process/basic/CustomEvent.java | 37 ++++++++ .../event/process/basic/EventListener.java | 18 ++++ .../event/process/basic/EventPublisher.java | 39 ++++++++ .../process/config/EventListenerConfig.java | 19 ++++ .../event/process/consumer/TestConsumer.java | 52 +++++++++++ .../process/consumer/VehicleConsumer.java | 82 +++++++++++++++++ .../controller/TestEventController.java | 88 +++++++++++++++++++ .../basic/config/IoTDBSessionConfig.java | 2 +- .../basic/service/IService.java} | 10 +-- .../basic/service/impl/ServiceImpl.java} | 20 ++--- .../process/{ => iotdb}/domain/DataJSON.java | 2 +- .../{ => iotdb}/domain/ResultEntity.java | 4 +- .../{ => iotdb}/domain/TestDataType.java | 2 +- .../{ => iotdb}/domain/dto/InsertDataDTO.java | 2 +- .../domain/dto/IoTDbRecordAble.java | 2 +- .../dto/MeasurementSchemaValuesDTO.java | 2 +- .../process/iotdb/service/IoTDBService.java | 11 +++ .../iotdb/service/TestIoTDBService.java | 20 +++++ .../iotdb/service/impl/IoTDBServiceImpl.java | 14 +++ .../service/impl/TestIoTDBServiceImpl.java | 33 +++++++ .../process/listener/AddDatabaseListener.java | 43 +++++++++ .../src/main/resources/bootstrap.yml | 4 +- 23 files changed, 491 insertions(+), 27 deletions(-) create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/CustomEvent.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventListener.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/config/EventListenerConfig.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{ => iotdb}/basic/config/IoTDBSessionConfig.java (97%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{basic/service/IoTDBService.java => iotdb/basic/service/IService.java} (97%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{basic/service/impl/IoTDBServiceImpl.java => iotdb/basic/service/impl/ServiceImpl.java} (98%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{ => iotdb}/domain/DataJSON.java (93%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{ => iotdb}/domain/ResultEntity.java (81%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{ => iotdb}/domain/TestDataType.java (93%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{ => iotdb}/domain/dto/InsertDataDTO.java (97%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{ => iotdb}/domain/dto/IoTDbRecordAble.java (75%) rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/{ => iotdb}/domain/dto/MeasurementSchemaValuesDTO.java (94%) create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/AddDatabaseListener.java diff --git a/cloud-modules/cloud-modules-event-process/pom.xml b/cloud-modules/cloud-modules-event-process/pom.xml index 20a8257..0c734d7 100644 --- a/cloud-modules/cloud-modules-event-process/pom.xml +++ b/cloud-modules/cloud-modules-event-process/pom.xml @@ -52,6 +52,12 @@ mysql-connector-j + + + org.apache.iotdb + iotdb-session + + com.muyu @@ -82,10 +88,10 @@ cloud-common-core - + - org.apache.iotdb - iotdb-session + com.muyu + cloud-common-kafka \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/CustomEvent.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/CustomEvent.java new file mode 100644 index 0000000..2b80585 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/CustomEvent.java @@ -0,0 +1,37 @@ +package com.muyu.event.process.basic; + +import com.alibaba.fastjson2.JSONObject; +import org.springframework.context.ApplicationEvent; + +/** + * @Author: zi run + * @Date 2024/9/29 21:19 + * @Description 自定义事件 + */ +public class CustomEvent extends ApplicationEvent { + + /** + * 存储与事件相关联的数据 + */ + private final JSONObject data; + + /** + * 创建一个新的自定义事件 + * + * @param source 事件源,表示触发此事件的对象 + * @param data 事件携带的数据,以JSON格式存储 + */ + public CustomEvent(Object source, JSONObject data) { + super(source); + this.data = data; + } + + /** + * 获取与此事件相关联的数据。 + * + * @return 包含事件数据的JSONObject + */ + public JSONObject getData() { + return data; + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventListener.java new file mode 100644 index 0000000..32115a6 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventListener.java @@ -0,0 +1,18 @@ +package com.muyu.event.process.basic; + +import org.springframework.context.ApplicationListener; + +/** + * @Author: zi run + * @Date 2024/9/29 21:29 + * @Description 事件监听基准 + */ +public interface EventListener extends ApplicationListener { + + /** + * 处理接收到的自定义事件。 + * + * @param event 已发布的自定义事件实例,包含事件的源和相关数据 + */ + void onEvent(CustomEvent event); +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java new file mode 100644 index 0000000..c23acf5 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java @@ -0,0 +1,39 @@ +package com.muyu.event.process.basic; + +import com.alibaba.fastjson2.JSONObject; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.stereotype.Component; + +/** + * @Author: zi run + * @Date 2024/9/29 22:01 + * @Description 事件发布者 + */ +@Component +public class EventPublisher implements ApplicationEventPublisherAware { + + /** + * 应用程序事件发布者,用于发布事件 + */ + private ApplicationEventPublisher applicationEventPublisher; + + /** + * 设置应用程序事件发布者。 + * + * @param applicationEventPublisher 应用程序事件发布者实例 + */ + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + /** + * 发布自定义事件。 + * + * @param messages 事件携带的数据,以JSON格式传递 + */ + public void publishEvent(JSONObject messages) { + applicationEventPublisher.publishEvent(new CustomEvent(this, messages)); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/config/EventListenerConfig.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/config/EventListenerConfig.java new file mode 100644 index 0000000..7b7913f --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/config/EventListenerConfig.java @@ -0,0 +1,19 @@ +package com.muyu.event.process.config; + +import com.muyu.event.process.listener.AddDatabaseListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Author: zi run + * @Date 2024/9/29 21:29 + * @Description 事件监听配置 + */ +@Configuration +public class EventListenerConfig { + + @Bean + public AddDatabaseListener addDatabaseListener() { + return new AddDatabaseListener(); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java new file mode 100644 index 0000000..cd7dbe2 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java @@ -0,0 +1,52 @@ +package com.muyu.event.process.consumer; + +import cn.hutool.core.thread.ThreadUtil; +import com.alibaba.nacos.shaded.com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Collection; + +/** + * @Author: zi run + * @Date 2024/9/29 16:53 + * @Description 测试消费者 + */ +@Slf4j +//@Component +@RequiredArgsConstructor +public class TestConsumer implements InitializingBean { + + /** + * kafka消费者 + */ + private final KafkaConsumer kafkaConsumer; + + /** + * kafka主题名称 + */ + private static final String topicName = "test-topic"; + + + @Override + public void afterPropertiesSet() throws Exception { + new Thread(() -> { + log.info("启动线程监听Topic: {}", topicName); + ThreadUtil.sleep(1000); + Collection topics = Lists.newArrayList(topicName); + kafkaConsumer.subscribe(topics); + while (true) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); + consumerRecords.forEach(record -> { + String value = record.value(); + log.info("从Kafka中消费的原始数据: {}", value); + }); + } + }).start(); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java new file mode 100644 index 0000000..46aaeea --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java @@ -0,0 +1,82 @@ +package com.muyu.event.process.consumer; + +import cn.hutool.core.thread.ThreadUtil; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.nacos.shaded.com.google.common.collect.Lists; +import com.muyu.event.process.basic.EventPublisher; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @Author: zi run + * @Date 2024/9/29 23:23 + * @Description 车辆消费者 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class VehicleConsumer implements InitializingBean { + + /** + * kafka消费者 + */ + private final KafkaConsumer kafkaConsumer; + + /** + * 事件发布者 + */ + private final EventPublisher eventPublisher; + + /** + * 协议解析报文传递数据(队列名称) + */ + public final static String MESSAGE_PARSING = "test-topic"; + +// @Override +// public void run(ApplicationArguments args) throws Exception { +// log.info("开始监听kafka-topic:{}", MESSAGE_PARSING); +// List topicList = Collections.singletonList(MESSAGE_PARSING); +// kafkaConsumer.subscribe(topicList); +// +// while (true) { +// ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); +// consumerRecords.forEach(record -> { +// String value = record.value(); +// log.info("接收到车辆报文数据,内容:{}", value); +// eventPublisher.publishEvent(JSONObject.parseObject(value)); +// }); +// } +// } + + @Async + @Override + public void afterPropertiesSet() throws Exception { + new Thread(() -> { + log.info("启动线程监听Topic: {}", MESSAGE_PARSING); + ThreadUtil.sleep(100); + Collection topics = Lists.newArrayList(MESSAGE_PARSING); + kafkaConsumer.subscribe(topics); + while (true) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); + consumerRecords.forEach(consumerRecord -> { + String message = consumerRecord.value(); + log.info("接收到车辆报文数据,内容:{}", message); + eventPublisher.publishEvent(JSONObject.parseObject(message)); + }); + } + }).start(); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java new file mode 100644 index 0000000..c32feb1 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java @@ -0,0 +1,88 @@ +package com.muyu.event.process.controller; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.common.core.constant.Constants; +import com.muyu.common.core.domain.Result; +import com.muyu.common.core.web.controller.BaseController; +import com.muyu.event.process.iotdb.service.TestIoTDBService; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/29 16:24 + * @Description 测试事件控制层 + */ +@RestController +@RequiredArgsConstructor +@RequestMapping(value = "/test-event") +public class TestEventController extends BaseController { + + /** + * kafka生产者 + */ + private final KafkaProducer kafkaProducer; + + /** + * kafka主题名称 + */ + private static final String kafkaTopicName = "test-topic"; + + /** + * 测试IoTDB业务层 + */ + private final TestIoTDBService testIoTDBService; + + /** + * 发送Kafka测试消息 + * + * @return 响应结果 + */ + @GetMapping(value = "/sendKafka") + public Result senKafka() { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("id",1); + jsonObject.put("name","张三"); + jsonObject.put("age",18); + jsonObject.put("sex","男"); + ProducerRecord producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString()); + kafkaProducer.send(producerRecord); + return Result.success(null, Constants.SUCCESS_MESSAGE); + } + + /** + * 查询IoTDB数据列表 + * @return 响应结果 + */ + @GetMapping(value = "/list") + public Result>> list() { + return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE); + } + + /** + * 向IoTDB添加数据 + * + * @return 响应结果 + */ + @PostMapping(value = "/save") + public Result save() { + String deviceId = "root.test"; + ArrayList keyList = new ArrayList<>(); + ArrayList valueList = new ArrayList<>(); + keyList.add("car_vin"); + keyList.add("car_name"); + valueList.add("VIN123456"); + valueList.add("宝马"); + testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList); + return Result.success(null, Constants.SUCCESS_MESSAGE); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/config/IoTDBSessionConfig.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java similarity index 97% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/config/IoTDBSessionConfig.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java index 01a498c..18e7feb 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/config/IoTDBSessionConfig.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java @@ -1,4 +1,4 @@ -package com.muyu.event.process.basic.config; +package com.muyu.event.process.iotdb.basic.config; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.session.pool.SessionPool; diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/service/IoTDBService.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/IService.java similarity index 97% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/service/IoTDBService.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/IService.java index 5c4ac01..fd5d3bf 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/service/IoTDBService.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/IService.java @@ -1,7 +1,7 @@ -package com.muyu.event.process.basic.service; +package com.muyu.event.process.iotdb.basic.service; -import com.muyu.event.process.domain.dto.IoTDbRecordAble; -import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO; +import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; +import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO; import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -14,9 +14,9 @@ import java.util.Map; /** * @Author: zi run * @Date 2024/9/28 23:37 - * @Description IoTDB业务层 + * @Description IoTDB基准业务层 */ -public interface IoTDBService { +public interface IService { /** * 插入一个 Tablet 对象到 IoTDB 数据库 diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java similarity index 98% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/service/impl/IoTDBServiceImpl.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java index 73e67e2..bc355d9 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/service/impl/IoTDBServiceImpl.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java @@ -1,11 +1,10 @@ -package com.muyu.event.process.basic.service.impl; +package com.muyu.event.process.iotdb.basic.service.impl; import com.alibaba.fastjson.JSON; -import com.muyu.event.process.basic.config.IoTDBSessionConfig; -import com.muyu.event.process.basic.service.IoTDBService; -import com.muyu.event.process.domain.dto.IoTDbRecordAble; -import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO; -import lombok.RequiredArgsConstructor; +import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig; +import com.muyu.event.process.iotdb.basic.service.IService; +import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; +import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.common.rpc.thrift.TAggregationType; @@ -17,6 +16,7 @@ import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.lang.reflect.Type; @@ -26,17 +26,17 @@ import java.util.stream.Collectors; /** * @Author: zi run * @Date 2024/9/28 23:38 - * @Description IoTDB业务实现层 + * @Description IoTDB基准业务实现层 */ @Slf4j @Service -@RequiredArgsConstructor -public class IoTDBServiceImpl implements IoTDBService { +public class ServiceImpl implements IService { /** * IoTDB会话配置 */ - private final IoTDBSessionConfig ioTDBSessionConfig; + @Autowired + private IoTDBSessionConfig ioTDBSessionConfig; /** * 插入一个 Tablet 对象到 IoTDB 数据库 diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/DataJSON.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/DataJSON.java similarity index 93% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/DataJSON.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/DataJSON.java index 16853f3..fcd4aac 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/DataJSON.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/DataJSON.java @@ -1,4 +1,4 @@ -package com.muyu.event.process.domain; +package com.muyu.event.process.iotdb.domain; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.tags.Tag; diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/ResultEntity.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/ResultEntity.java similarity index 81% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/ResultEntity.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/ResultEntity.java index 02d7d96..ad159a6 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/ResultEntity.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/ResultEntity.java @@ -1,6 +1,6 @@ -package com.muyu.event.process.domain; +package com.muyu.event.process.iotdb.domain; -import com.muyu.event.process.domain.dto.IoTDbRecordAble; +import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/TestDataType.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/TestDataType.java similarity index 93% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/TestDataType.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/TestDataType.java index dd75553..eef2853 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/TestDataType.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/TestDataType.java @@ -1,4 +1,4 @@ -package com.muyu.event.process.domain; +package com.muyu.event.process.iotdb.domain; import lombok.AllArgsConstructor; import lombok.Builder; diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/InsertDataDTO.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java similarity index 97% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/InsertDataDTO.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java index d213bd8..15b6cc2 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/InsertDataDTO.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java @@ -1,4 +1,4 @@ -package com.muyu.event.process.domain.dto; +package com.muyu.event.process.iotdb.domain.dto; import lombok.AllArgsConstructor; import lombok.Builder; diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/IoTDbRecordAble.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java similarity index 75% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/IoTDbRecordAble.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java index 957dfb5..5b8ad62 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/IoTDbRecordAble.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java @@ -1,4 +1,4 @@ -package com.muyu.event.process.domain.dto; +package com.muyu.event.process.iotdb.domain.dto; import lombok.Data; diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/MeasurementSchemaValuesDTO.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java similarity index 94% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/MeasurementSchemaValuesDTO.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java index 8681f19..f04a72b 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/domain/dto/MeasurementSchemaValuesDTO.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java @@ -1,4 +1,4 @@ -package com.muyu.event.process.domain.dto; +package com.muyu.event.process.iotdb.domain.dto; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java new file mode 100644 index 0000000..b473acf --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java @@ -0,0 +1,11 @@ +package com.muyu.event.process.iotdb.service; + +import com.muyu.event.process.iotdb.basic.service.IService; + +/** + * @Author: zi run + * @Date 2024/9/29 22:38 + * @Description IoTDB业务层 + */ +public interface IoTDBService extends IService { +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java new file mode 100644 index 0000000..84302bd --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java @@ -0,0 +1,20 @@ +package com.muyu.event.process.iotdb.service; + +import com.muyu.event.process.iotdb.basic.service.IService; + +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/29 17:23 + * @Description 测试IoTDB业务层 + */ +public interface TestIoTDBService extends IService { + + /** + * 查询IoTDB数据列表 + * @return 返回结果 + */ + List> list(); +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java new file mode 100644 index 0000000..3167618 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java @@ -0,0 +1,14 @@ +package com.muyu.event.process.iotdb.service.impl; + +import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl; +import com.muyu.event.process.iotdb.service.IoTDBService; +import org.springframework.stereotype.Service; + +/** + * @Author: zi run + * @Date 2024/9/29 22:39 + * @Description IoTDB业务实现层 + */ +@Service +public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService { +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java new file mode 100644 index 0000000..21dde1d --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java @@ -0,0 +1,33 @@ +package com.muyu.event.process.iotdb.service.impl; + +import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl; +import com.muyu.event.process.iotdb.service.TestIoTDBService; +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.isession.SessionDataSet; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/29 17:24 + * @Description 测试IoTDB业务实现层 + */ +@Slf4j +@Service +public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService { + + /** + * 查询IoTDB数据列表 + * @return 返回结果 + */ + @Override + public List> list() { + String sql = "select * from root.test"; + SessionDataSet sessionDataSet = this.executeQueryStatement(sql); + List> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes()); + log.info("查询IoTDB数据为:{}", list.toString()); + return list; + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/AddDatabaseListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/AddDatabaseListener.java new file mode 100644 index 0000000..1e0206a --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/AddDatabaseListener.java @@ -0,0 +1,43 @@ +package com.muyu.event.process.listener; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.event.process.basic.CustomEvent; +import com.muyu.event.process.basic.EventListener; +import com.muyu.event.process.iotdb.basic.service.IService; +import com.muyu.event.process.iotdb.service.IoTDBService; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.List; + +/** + * @Author: zi run + * @Date 2024/9/29 22:12 + * @Description 添加数据库事件监听器 + */ +public class AddDatabaseListener implements EventListener { + + /** + * IoTDB业务层 + */ + @Autowired + private IoTDBService ioTDBService; + + @Override + public void onEvent(CustomEvent event) { + JSONObject data = event.getData(); + List keyList = new ArrayList<>(); + List valueList = new ArrayList<>(); + data.forEach((key, value) -> { + keyList.add(key); + valueList.add((String) value); + }); + ioTDBService.insertStringRecord("root.vehicle", System.currentTimeMillis(), keyList, valueList); + } + + @Override + public void onApplicationEvent(CustomEvent event) { + onEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml index 47bf553..f880532 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml @@ -42,4 +42,6 @@ spring: # 系统共享配置 - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} # 系统环境Config共享配置 - - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} \ No newline at end of file + - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # kafka共享配置 + - application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} \ No newline at end of file