diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEvent.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEvent.java new file mode 100644 index 0000000..1701cba --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEvent.java @@ -0,0 +1,37 @@ +package com.muyu.event.process.basic; + +import org.springframework.context.ApplicationEvent; + +/** + * @Author: zi run + * @Date 2024/9/30 15:11 + * @Description 基础事件 + */ +public class BasicEvent extends ApplicationEvent { + + /** + * 事件携带的数据 + */ + private final T data; + + /** + * 构造函数,初始化事件源和数据 + * + * @param source 事件源对象 + * @param data 事件携带的数据 + */ + public BasicEvent(Object source, T data) { + super(source); + this.data = data; + } + + + /** + * 获取事件携带的数据 + * + * @return 事件数据 + */ + public T getData() { + return data; + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventHandler.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventHandler.java new file mode 100644 index 0000000..d9571d2 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventHandler.java @@ -0,0 +1,37 @@ +package com.muyu.event.process.basic; + +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +/** + * @Author: zi run + * @Date 2024/9/30 15:37 + * @Description 基础事件处理器 + */ +@Component +public class BasicEventHandler implements ApplicationListener> { + + /** + * 具体事件监听器 + */ + private final BasicEventListener listener; + + /** + * 构造函数,用于注入具体事件监听器 + * + * @param listener 具体事件监听器 + */ + public BasicEventHandler(BasicEventListener listener) { + this.listener = listener; + } + + /** + * 处理应用事件 + * + * @param event 事件对象 + */ + @Override + public void onApplicationEvent(BasicEvent event) { + listener.onEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventListener.java new file mode 100644 index 0000000..7fecd6e --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventListener.java @@ -0,0 +1,16 @@ +package com.muyu.event.process.basic; + +/** + * @Author: zi run + * @Date 2024/9/30 15:35 + * @Description 基础事件监听器 + */ +public interface BasicEventListener { + + /** + * 处理事件的方法 + * + * @param event 事件对象 + */ + void onEvent(BasicEvent event); +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/CustomEvent.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/CustomEvent.java deleted file mode 100644 index 2b80585..0000000 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/CustomEvent.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.muyu.event.process.basic; - -import com.alibaba.fastjson2.JSONObject; -import org.springframework.context.ApplicationEvent; - -/** - * @Author: zi run - * @Date 2024/9/29 21:19 - * @Description 自定义事件 - */ -public class CustomEvent extends ApplicationEvent { - - /** - * 存储与事件相关联的数据 - */ - private final JSONObject data; - - /** - * 创建一个新的自定义事件 - * - * @param source 事件源,表示触发此事件的对象 - * @param data 事件携带的数据,以JSON格式存储 - */ - public CustomEvent(Object source, JSONObject data) { - super(source); - this.data = data; - } - - /** - * 获取与此事件相关联的数据。 - * - * @return 包含事件数据的JSONObject - */ - public JSONObject getData() { - return data; - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventListener.java deleted file mode 100644 index 32115a6..0000000 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventListener.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.muyu.event.process.basic; - -import org.springframework.context.ApplicationListener; - -/** - * @Author: zi run - * @Date 2024/9/29 21:29 - * @Description 事件监听基准 - */ -public interface EventListener extends ApplicationListener { - - /** - * 处理接收到的自定义事件。 - * - * @param event 已发布的自定义事件实例,包含事件的源和相关数据 - */ - void onEvent(CustomEvent event); -} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java index c23acf5..b037cd2 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java @@ -1,6 +1,5 @@ package com.muyu.event.process.basic; -import com.alibaba.fastjson2.JSONObject; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.stereotype.Component; @@ -16,24 +15,24 @@ public class EventPublisher implements ApplicationEventPublisherAware { /** * 应用程序事件发布者,用于发布事件 */ - private ApplicationEventPublisher applicationEventPublisher; + private ApplicationEventPublisher publisher; /** - * 设置应用程序事件发布者。 + * 设置应用程序事件发布者 * - * @param applicationEventPublisher 应用程序事件发布者实例 + * @param publisher 应用程序事件发布者实例 */ @Override - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; + public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { + this.publisher = publisher; } /** - * 发布自定义事件。 - * - * @param messages 事件携带的数据,以JSON格式传递 + * 发布事件 + * @param event 要发布的事件 + * @param 事件数据类型 */ - public void publishEvent(JSONObject messages) { - applicationEventPublisher.publishEvent(new CustomEvent(this, messages)); + public void publish(BasicEvent event) { + publisher.publishEvent(event); } } diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/config/EventListenerConfig.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/config/EventListenerConfig.java deleted file mode 100644 index 7b7913f..0000000 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/config/EventListenerConfig.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.muyu.event.process.config; - -import com.muyu.event.process.listener.AddDatabaseListener; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * @Author: zi run - * @Date 2024/9/29 21:29 - * @Description 事件监听配置 - */ -@Configuration -public class EventListenerConfig { - - @Bean - public AddDatabaseListener addDatabaseListener() { - return new AddDatabaseListener(); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java index 46aaeea..9a688b4 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java @@ -1,24 +1,24 @@ package com.muyu.event.process.consumer; -import cn.hutool.core.thread.ThreadUtil; -import com.alibaba.fastjson2.JSONObject; -import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.muyu.event.process.basic.EventPublisher; +import com.muyu.event.process.event.IoTDBInsertDataEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; -import org.springframework.scheduling.annotation.Async; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; import java.time.Duration; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * @Author: zi run @@ -28,7 +28,7 @@ import java.util.stream.Collectors; @Slf4j @Component @RequiredArgsConstructor -public class VehicleConsumer implements InitializingBean { +public class VehicleConsumer implements ApplicationRunner, ApplicationListener { /** * kafka消费者 @@ -43,40 +43,51 @@ public class VehicleConsumer implements InitializingBean { /** * 协议解析报文传递数据(队列名称) */ - public final static String MESSAGE_PARSING = "test-topic"; + public final static String MESSAGE_PARSING = "MessageParsing"; -// @Override -// public void run(ApplicationArguments args) throws Exception { -// log.info("开始监听kafka-topic:{}", MESSAGE_PARSING); -// List topicList = Collections.singletonList(MESSAGE_PARSING); -// kafkaConsumer.subscribe(topicList); -// -// while (true) { -// ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); -// consumerRecords.forEach(record -> { -// String value = record.value(); -// log.info("接收到车辆报文数据,内容:{}", value); -// eventPublisher.publishEvent(JSONObject.parseObject(value)); -// }); -// } -// } + /** + * 设定固定大小的线程池,线程数量与当前可用的处理器核心数相同 + */ + private final ExecutorService executorService = + Executors.newFixedThreadPool(10); - @Async @Override - public void afterPropertiesSet() throws Exception { - new Thread(() -> { - log.info("启动线程监听Topic: {}", MESSAGE_PARSING); - ThreadUtil.sleep(100); - Collection topics = Lists.newArrayList(MESSAGE_PARSING); - kafkaConsumer.subscribe(topics); - while (true) { - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); - consumerRecords.forEach(consumerRecord -> { - String message = consumerRecord.value(); - log.info("接收到车辆报文数据,内容:{}", message); - eventPublisher.publishEvent(JSONObject.parseObject(message)); - }); + public void run(ApplicationArguments args) throws Exception { + log.info("启动线程监听Topic: {}", MESSAGE_PARSING); + List topics = Collections.singletonList(MESSAGE_PARSING); + kafkaConsumer.subscribe(topics); + while (true) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); + consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord))); + } + } + + private void handleRecord(ConsumerRecord consumerRecord) { + String message = consumerRecord.value(); + log.info("接收到车辆报文数据,内容:{}", message); + log.info("------------------------------------------------"); + eventPublisher.publish(new IoTDBInsertDataEvent(this, message)); + } + + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("关闭线程池和Kafka消费者"); + + try { + executorService.shutdown(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); } - }).start(); + } catch (InterruptedException e) { + log.error("线程池关闭被中断,强制关闭", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + try { + kafkaConsumer.close(); // 关闭Kafka消费者 + } catch (Exception e) { + log.error("关闭Kafka消费者时发生错误", e); + } } } diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java index c32feb1..bfc0dc3 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/controller/TestEventController.java @@ -50,9 +50,9 @@ public class TestEventController extends BaseController { @GetMapping(value = "/sendKafka") public Result senKafka() { JSONObject jsonObject = new JSONObject(); - jsonObject.put("id",1); + jsonObject.put("id","1"); jsonObject.put("name","张三"); - jsonObject.put("age",18); + jsonObject.put("age","18"); jsonObject.put("sex","男"); ProducerRecord producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString()); kafkaProducer.send(producerRecord); diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java new file mode 100644 index 0000000..91d7d6e --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java @@ -0,0 +1,20 @@ +package com.muyu.event.process.event; + +import com.muyu.event.process.basic.BasicEvent; + +/** + * @Author: zi run + * @Date 2024/9/29 21:19 + * @Description 向IoTDB插入数据事件 + */ +public class IoTDBInsertDataEvent extends BasicEvent { + + /** + * 构造函数,向IoTDB插入数据创建事件 + * + * @param messsge 消息 + */ + public IoTDBInsertDataEvent(Object source, String messsge) { + super(source, messsge); + } +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/AddDatabaseListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/AddDatabaseListener.java deleted file mode 100644 index 1e0206a..0000000 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/AddDatabaseListener.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.muyu.event.process.listener; - -import com.alibaba.fastjson2.JSONObject; -import com.muyu.event.process.basic.CustomEvent; -import com.muyu.event.process.basic.EventListener; -import com.muyu.event.process.iotdb.basic.service.IService; -import com.muyu.event.process.iotdb.service.IoTDBService; -import lombok.RequiredArgsConstructor; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.ArrayList; -import java.util.List; - -/** - * @Author: zi run - * @Date 2024/9/29 22:12 - * @Description 添加数据库事件监听器 - */ -public class AddDatabaseListener implements EventListener { - - /** - * IoTDB业务层 - */ - @Autowired - private IoTDBService ioTDBService; - - @Override - public void onEvent(CustomEvent event) { - JSONObject data = event.getData(); - List keyList = new ArrayList<>(); - List valueList = new ArrayList<>(); - data.forEach((key, value) -> { - keyList.add(key); - valueList.add((String) value); - }); - ioTDBService.insertStringRecord("root.vehicle", System.currentTimeMillis(), keyList, valueList); - } - - @Override - public void onApplicationEvent(CustomEvent event) { - onEvent(event); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java new file mode 100644 index 0000000..0c0f96c --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java @@ -0,0 +1,68 @@ +package com.muyu.event.process.listener; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.event.process.basic.BasicEvent; +import com.muyu.event.process.basic.BasicEventListener; +import com.muyu.event.process.event.IoTDBInsertDataEvent; +import com.muyu.event.process.iotdb.service.IoTDBService; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @Author: zi run + * @Date 2024/9/29 22:12 + * @Description 向IoTDB插入数据事件监听器 + */ +@Component +@RequiredArgsConstructor +public class IoTDBInsertDataListener implements BasicEventListener { + + /** + * IoTDB业务层 + */ + private final IoTDBService ioTDBService; + + /** + * 设备名(表名) + */ + private static final String DEVICE_ID = "root.vehicle"; + + /** + * 处理接收到的事件,将数据插入到 IoTDB + * + * @param event 接收到的事件,包含需要插入的数据 + */ + @Override + public void onEvent(BasicEvent event) { + JSONObject data = JSONObject.parseObject(event.getData()); + List keyList = extractKeys(data); + List valueList = extractValues(data); + ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList); + } + + /** + * 从给定的JSONObject中提取所有的键 + * + * @param data 要提取键的JSONObject + * @return 键的列表 + */ + private List extractKeys(JSONObject data) { + return data.keySet().stream().collect(Collectors.toList()); + } + + /** + * 从给定的 JSONObject 中提取所有的值,并将其转换为字符串 + * + * @param data 要提取值的JSONObject + * @return 值的列表,以字符串形式表示 + */ + private List extractValues(JSONObject data) { + return data.values().stream() + .map(Object::toString) + .collect(Collectors.toList()); + } +}