From 6f95055a6bb34e335e6903520a45c4368efbab10 Mon Sep 17 00:00:00 2001 From: xinzirun Date: Wed, 2 Oct 2024 16:49:39 +0800 Subject: [PATCH] =?UTF-8?q?feat():=20=E6=96=B0=E5=A2=9Erabbitmq=E5=85=AC?= =?UTF-8?q?=E5=85=B1=E6=A8=A1=E5=9D=97=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-auth/src/main/resources/bootstrap.yml | 2 +- .../rabbit/callback/ConfirmCallback.java | 44 + .../rabbit/callback/ReturnsCallback.java | 39 + .../rabbit/config/RabbitAdminConfig.java | 72 ++ .../RabbitListenerConfig.java} | 29 +- .../RabbitMQMessageConverterConfig.java | 26 + ...ot.autoconfigure.AutoConfiguration.imports | 4 +- .../src/main/resources/bootstrap.yml | 2 +- .../com/muyu/enterprise/MQTT/ClientMQTT.java | 74 -- .../enterprise/MQTT/MQTTReceiveCallback.java | 69 -- .../muyu/enterprise/MQTT/MyMqttClient.java | 187 ----- .../muyu/enterprise/MQTT/PushCallback.java | 52 -- .../com/muyu/enterprise/MQTT/ServerMQTT.java | 113 --- .../src/main/resources/bootstrap.yml | 2 +- .../cloud-modules-event-process/pom.xml | 6 + .../process/CloudEventProcessApplication.java | 2 +- .../muyu/event/process/basic/BasicEvent.java | 2 +- .../process/basic/BasicEventHandler.java | 2 +- .../process/basic/BasicEventListener.java | 2 +- .../event/process/basic/EventPublisher.java | 2 +- ...stConsumer.java => TestKafkaConsumer.java} | 6 +- .../consumer/TestRabbitMQConsumer.java | 70 ++ .../process/consumer/VehicleConsumer.java | 6 +- .../controller/TestEventController.java | 28 +- .../process/event/IoTDBInsertDataEvent.java | 2 +- .../basic/config/IoTDBSessionConfig.java | 2 +- .../process/iotdb/basic/service/IService.java | 2 +- .../iotdb/basic/service/impl/ServiceImpl.java | 2 +- .../event/process/iotdb/domain/DataJSON.java | 2 +- .../process/iotdb/domain/ResultEntity.java | 2 +- .../process/iotdb/domain/TestDataType.java | 2 +- .../iotdb/domain/dto/InsertDataDTO.java | 2 +- .../iotdb/domain/dto/IoTDbRecordAble.java | 2 +- .../dto/MeasurementSchemaValuesDTO.java | 2 +- .../process/iotdb/service/IoTDBService.java | 2 +- .../iotdb/service/TestIoTDBService.java | 2 +- .../iotdb/service/impl/IoTDBServiceImpl.java | 2 +- .../service/impl/TestIoTDBServiceImpl.java | 2 +- .../listener/IoTDBInsertDataListener.java | 2 +- .../src/main/resources/bootstrap.yml | 6 +- .../process/CloudEventProcessApplication.java | 20 - .../muyu/event/process/basic/BasicEvent.java | 37 - .../process/basic/BasicEventHandler.java | 37 - .../process/basic/BasicEventListener.java | 16 - .../event/process/basic/EventPublisher.java | 38 - .../event/process/consumer/TestConsumer.java | 52 -- .../process/consumer/VehicleConsumer.java | 93 --- .../controller/TestEventController.java | 88 -- .../process/event/IoTDBInsertDataEvent.java | 20 - .../basic/config/IoTDBSessionConfig.java | 72 -- .../process/iotdb/basic/service/IService.java | 290 ------- .../iotdb/basic/service/impl/ServiceImpl.java | 765 ------------------ .../event/process/iotdb/domain/DataJSON.java | 33 - .../process/iotdb/domain/ResultEntity.java | 36 - .../process/iotdb/domain/TestDataType.java | 43 - .../iotdb/domain/dto/InsertDataDTO.java | 67 -- .../iotdb/domain/dto/IoTDbRecordAble.java | 12 - .../dto/MeasurementSchemaValuesDTO.java | 36 - .../process/iotdb/service/IoTDBService.java | 11 - .../iotdb/service/TestIoTDBService.java | 20 - .../iotdb/service/impl/IoTDBServiceImpl.java | 14 - .../service/impl/TestIoTDBServiceImpl.java | 33 - .../listener/IoTDBInsertDataListener.java | 68 -- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../cloud-modules-protocol-analysis/pom.xml | 6 + .../src/main/resources/bootstrap.yml | 2 +- .../cloud-modules-vehicle-gateway/pom.xml | 10 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- pom.xml | 43 + 71 files changed, 406 insertions(+), 2441 deletions(-) create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java rename cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/{RabbitListenerConfigurer.java => config/RabbitListenerConfig.java} (58%) create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java rename cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/{TestConsumer.java => TestKafkaConsumer.java} (93%) create mode 100644 cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestRabbitMQConsumer.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/CloudEventProcessApplication.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEvent.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventHandler.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventListener.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/EventPublisher.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/TestConsumer.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/VehicleConsumer.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/controller/TestEventController.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/IService.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/DataJSON.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/ResultEntity.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/TestDataType.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/IoTDBService.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java delete mode 100644 cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml index eeaceee..8581ac4 100644 --- a/cloud-auth/src/main/resources/bootstrap.yml +++ b/cloud-auth/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: application: diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java new file mode 100644 index 0000000..95fa690 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java @@ -0,0 +1,44 @@ +package com.muyu.common.rabbit.callback; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @Author: zi run + * @Date 2024/10/2 9:26 + * @Description 消息发送到broker确认回调 + */ +@Slf4j +@Component +public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { + + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 初始化 + */ + @PostConstruct + public void init() { + this.rabbitTemplate.setConfirmCallback(this); + } + + /** + * 回调确认方法(消息发送之后 消息无论发送成功还是失败都会执行这个回调方法) + * @param correlationData 回调的相关数据 + * @param ack 确认结果(true表示消息已经被broker接收,false表示消息未被broker接收) + * @param cause 失败原因(当ack为false时,表示拒绝接收消息的原因;当ack为true时,该值为空) + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + log.info("消息发送到broker成功!"); + } else { + log.info("消息发送到broker失败,失败原因:{}", cause); + } + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java new file mode 100644 index 0000000..0bbd94a --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java @@ -0,0 +1,39 @@ +package com.muyu.common.rabbit.callback; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @Author: zi run + * @Date 2024/10/2 9:33 + * @Description 消息发送失败时回调 + */ +@Slf4j +@Component +public class ReturnsCallback implements RabbitTemplate.ReturnsCallback { + + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 初始化 + */ + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送到队列失败时执行 + * @param returnedMessage 返回的消息和元数据 + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.info("消息:{}被交换机:{}回退!回退原因:{}", returnedMessage.getMessage().toString(), + returnedMessage.getExchange(), returnedMessage.getReplyText()); + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java new file mode 100644 index 0000000..89afc95 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java @@ -0,0 +1,72 @@ +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Author: zi run + * @Date 2024/10/2 16:41 + * @Description RabbitMQ连接和管理功能配置 + */ +@Configuration +public class RabbitAdminConfig { + + /** + * RabbitMQ服务器的主机地址 + */ + @Value("${spring.rabbitmq.host}") + private String host; + + /** + * RabbitMQ的用户名 + */ + @Value("${spring.rabbitmq.username}") + private String username; + + /** + * RabbitMQ的密码 + */ + @Value("${spring.rabbitmq.password}") + private String password; + + /** + * RabbitMQ的虚拟主机 + */ + @Value("${spring.rabbitmq.virtualhost}") + private String virtualhost; + + /** + * 创建并初始化RabbitAdmin实例 + * + * @return RabbitAdmin 实例 + */ + @Bean + public RabbitAdmin rabbitAdmin() { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } + + /** + * 创建RabbitMQ连接工厂 + * + * @return ConnectionFactory 实例 + */ + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java similarity index 58% rename from cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java rename to cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java index 51cb359..859823e 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java @@ -1,28 +1,41 @@ -package com.muyu.common.rabbit; +package com.muyu.common.rabbit.config; +import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +/** + * @Author: zi run + * @Date 2024/10/2 10:06 + * @Description rabbitMQ的监听器配置 + */ @Configuration -public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer { +public class RabbitListenerConfig implements RabbitListenerConfigurer { static { + // 设置为信任所有类型的反序列化,确保消息能够正确反序列化 System.setProperty("spring.amqp.deserialization.trust.all", "true"); } - //以下配置RabbitMQ消息服务 + /** + * RabbitMQ连接工厂,用于创建连接 + */ @Autowired public ConnectionFactory connectionFactory; + @Autowired + private MessageConverter jsonMessageConverter; /** - * 处理器方法工厂 - * @return + * 创建处理器方法工厂的bean + * + * @return DefaultMessageHandlerMethodFactory 实例,用于处理消息的转换和方法调用 */ @Bean public DefaultMessageHandlerMethodFactory handlerMethodFactory() { @@ -32,8 +45,14 @@ public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit return factory; } + /** + * 配置RabbitMQ监听器的消息处理方法工厂。 + * + * @param rabbitListenerEndpointRegistrar 实例,用于注册监听器的配置 + */ @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { + // 注册自定义的消息处理方法工厂 rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory()); } diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java new file mode 100644 index 0000000..953966d --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java @@ -0,0 +1,26 @@ + +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Author: zi run + * @Date 2024/10/2 9:17 + * @Description rabbitMQ消息转换器配置 + */ +@Configuration +public class RabbitMQMessageConverterConfig { + + /** + * 消息转换配置 + * + * @return 消息转换器 + */ + @Bean + public MessageConverter jsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 189ea2c..720eca2 100644 --- a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,3 @@ -com.muyu.common.rabbit.RabbitListenerConfigurer \ No newline at end of file +com.muyu.common.rabbit.config.RabbitAdminConfig +com.muyu.common.rabbit.config.RabbitListenerConfig +com.muyu.common.rabbit.config.RabbitMQMessageConverterConfig \ No newline at end of file diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml index d315c84..9618890 100644 --- a/cloud-gateway/src/main/resources/bootstrap.yml +++ b/cloud-gateway/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java deleted file mode 100644 index 169edaf..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.muyu.enterprise.MQTT; - -import com.alibaba.nacos.api.remote.PushCallBack; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttTopic; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -/** - * 模拟一个客户端接收消息 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:ClientMQTT - * @Date:2024/9/28 12:11 - */ -public class ClientMQTT { - //String topic = "vehicle"; - // String content = "Message from MqttPublishSample"; - // int qos = 2; - // String broker = "tcp://106.15.136.7:1883"; - // String clientId = "JavaSample"; - //MQTT代理服务器地址 - public static final String HOST="tcp://106.15.136.7:1883"; - public static final String TOPIC1="pos_message_all"; - private static final String clientId="12345678"; - private MqttClient client; - private MqttConnectOptions options; - private String userName="mqtt"; //非必须 - private String passWord="mqtt"; //非必须 - - private void start(){ - try{ - // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 - client= new MqttClient(HOST,clientId,new MemoryPersistence()); - // MQTT的连接设置 - options=new MqttConnectOptions(); - // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 - options.setCleanSession(false); - // 设置连接的用户名 - options.setUserName(userName); - // 设置连接的密码 - options.setPassword(passWord.toCharArray()); - // 设置超时时间 单位为秒 - options.setConnectionTimeout(10); - // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 - options.setKeepAliveInterval(20); - //设置断开后重新连接 - options.setAutomaticReconnect(true); - // 设置回调 - client.setCallback(new PushCallback()); - MqttTopic topic = client.getTopic(TOPIC1); - //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 - //遗嘱 - options.setWill(topic,"close".getBytes(),1,true); - client.connect(options); - //订阅消息 - int[] Qos = {1} ; //0:最多一次 、1:最少一次 、2:只有一次 - String[] topics1 = {TOPIC1}; - client.subscribe(topics1,Qos); - - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static void main(String[] args){ - ClientMQTT clientMQTT = new ClientMQTT(); - clientMQTT.start(); - } - - - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java deleted file mode 100644 index 5f18256..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.muyu.enterprise.MQTT; - -/** - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:MQTTReceiveCallback - * @Date:2024/9/27 22:21 - */ - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -/** - * 发布消息的回调类 - * - * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 - * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 - * 在回调中,将它用来标识已经启动了该回调的哪个实例。 - * 必须在回调类中实现三个方法: - * - * (1):public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 - * - * (2):public void connectionLost(Throwable cause)在断开连接时调用。 - * - * (3):public void deliveryComplete(MqttDeliveryToken token)) - * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 - * 由 MqttClient.connect 激活此回调。 - * - */ -public class MQTTReceiveCallback implements MqttCallback { -// @Override -// public void connectionLost(Throwable throwable) { -// //连接丢失后,一般在这里面进行重连 -// System.out.println("连接断开,可以做重连"); -// } -// -// @Override -// public void messageArrived(String topic, MqttMessage message) throws Exception { -// //subscribe后得到的消息会执行到这里面 -// System.out.println("接收消息主题:"+topic); -// System.out.println("接收消息Qos:"+message.getQos()); -// System.out.println("接收消息内容:"+new String(message.getPayload())); -// } -// -// @Override -// public void deliveryComplete(IMqttDeliveryToken token) { -// System.out.println("deliveryComplete----------"+token.isComplete()); -// } - @Override - public void connectionLost(Throwable throwable){ - //连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - //subscribe后得到的消息会执行到这面 - System.out.println("接收消息主题:"+topic); - System.out.println("接收消息Qos:"+mqttMessage.getQos()); - System.out.println("接收消息内容:"+new String(mqttMessage.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - System.out.println("deliveryComplete---------"+iMqttDeliveryToken.isComplete()); - } - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java deleted file mode 100644 index fb9a4bc..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java +++ /dev/null @@ -1,187 +0,0 @@ -package com.muyu.enterprise.MQTT; - - -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -/** - * 客户端类 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.mqtt - * @Project:cloud-server - * @name:MyMqttClient - * @Date:2024/9/27 22:20 - */ -public class MyMqttClient { - - public static MqttClient mqttClient =null; - private static MemoryPersistence memoryPersistence=null; - private static MqttConnectOptions mqttConectOptions=null; - private static String ClinentName=""; //待填 将在服务端出现的名字 - private static String IP=""; //待填 服务器IP - - - public static void main(String[] args) { - start(ClinentName); - } - - - - public static void start(String clientId){ - //初始化连接设置对象 - mqttConectOptions=new MqttConnectOptions(); - //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, - //这里设置为true表示每次连接到服务器都以新的身份连接 - mqttConectOptions.setCleanSession(true); - //设置连接超时时间,单位是秒 - mqttConectOptions.setConnectionTimeout(10); - //设置持久化方式 - memoryPersistence=new MemoryPersistence(); - if(null!=clientId){ - try{ - mqttClient =new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - } - System.out.println("连接状态:"+mqttClient.isConnected()); - //设置连接和回调 - if(null!=mqttClient){ - if(!mqttClient.isConnected()){ - //创建回调函数对象 - MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback(); - //客户端添加回调函数 - mqttClient.setCallback(MQTTReceiveCallback); - //创建连接 - try{ - System.out.println("创建连接"); - mqttClient.connect(mqttConectOptions); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - } - }else { - System.out.println("mqttClient为空"); - } - System.out.println("连接状态"+mqttClient.isConnected()); - } - // 关闭连接 - public void closeConnect(){ - //关闭储存方式 - if(null!=memoryPersistence){ - try{ - memoryPersistence.close(); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("memoryPersistence为空"); - } - - if(null!=mqttClient){ - if(mqttClient.isConnected()){ - try{ - mqttClient.disconnect(); - mqttClient.close(); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttClient未连接"); - } - }else { - System.out.println("mqttClient为空"); - } - } - - - //发布消息 - public static void publishMessage(String pubTopic,String message,int qos){ - if(null!=mqttClient && mqttClient.isConnected()){ - System.out.println("发布消息"+mqttClient.isConnected()); - System.out.println("id"+mqttClient.isConnected()); - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(qos); - - MqttTopic topic = mqttClient.getTopic(pubTopic); - - if(null!=topic){ - try{ - MqttDeliveryToken publish = topic.publish(mqttMessage); - if(!publish.isComplete()){ - System.out.println("消息发布成功"); - } - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - } - - } - } - - - //重新连接 - public static void reConnect(){ - if(null!=mqttClient&&mqttClient.isConnected()){ - if(!mqttClient.isConnected()){ - if(null!=mqttConectOptions){ - try{ - mqttClient.connect(mqttConectOptions); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttConnectOptions为空"); - } - }else { - System.out.println("mqttClient为空或已连接"); - } - }else { - start(ClinentName); - } - } - - - //订阅主题 - public static void suvTopic(String topic){ - if(null!=mqttClient && mqttClient.isConnected()){ - try{ - mqttClient.subscribe(topic,1); - } catch (MqttException e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttClient出错"); - } - } - - - //清空主题 - public void cleanTopic(String topic){ - if(null!=mqttClient && mqttClient.isConnected()){ - try{ - mqttClient.subscribe(topic); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttClient出错"); - } - } - - - - - - - - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java deleted file mode 100644 index 769a3bc..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.muyu.enterprise.MQTT; - -/** - * 发布消息的回调类 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:PushCallback - * @Date:2024/9/28 14:35 - */ - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -/** - * 发布消息的回调类 - * - * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 - * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 - * 在回调中,将它用来标识已经启动了该回调的哪个实例。 - * 必须在回调类中实现三个方法: - * - * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 - * - * public void connectionLost(Throwable cause)在断开连接时调用。 - * - * public void deliveryComplete(MqttDeliveryToken token)) - * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 - * 由 MqttClient.connect 激活此回调。 - * - */ -public class PushCallback implements MqttCallback { - @Override - public void connectionLost(Throwable throwable) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { -// subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题 : " + topic); - System.out.println("接收消息Qos : " + mqttMessage.getQos()); - System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); - } -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java deleted file mode 100644 index f026d7e..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java +++ /dev/null @@ -1,113 +0,0 @@ -package com.muyu.enterprise.MQTT; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -/** - * 这是发送消息的服务端 - * 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:ServerMQTT - * @Date:2024/9/28 14:32 - */ -@Log4j2 -public class ServerMQTT { - - //tcp://MQTT安装的服务器地址:MQTT定义的端口号 - public static final String HOST = "tcp://127.0.0.1:1883"; - //定义一个主题 - public static final String TOPIC = "pos_message_all"; - //定义MQTT的ID,可以在MQTT服务配置中指定 - private static final String clientId = "server11"; - - private MqttClient client; - private static MqttTopic topic11; - - // private String userName = "mqtt"; //非必须 - // private String passWord = "mqtt"; //非必须 - - private static MqttMessage message; - - /** - * 构造方法 - * @throws MqttException - */ - public ServerMQTT() throws MqttException { - // MemoryPersistence设置clientid的保存形式,默认为以内存保存 - client=new MqttClient(HOST, clientId,new MemoryPersistence()); - connect(); - } - - /** - * 用来连接服务器 - */ - private void connect() - { - MqttConnectOptions options = new MqttConnectOptions(); - options.setCleanSession(false); - // options.setUserName(userName); - // options.setPassword(passWord.toCharArray()); - // 设置超时时间 - options.setConnectionTimeout(10); - // 设置会话心跳时间 - options.setKeepAliveInterval(20); - try{ - client.setCallback(new PushCallback()); - client.connect(options); - topic11 = client.getTopic(TOPIC); - - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /** - * - * @param topic - * @param message - * @throws MqttException - */ - public static void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,MqttException{ - MqttDeliveryToken token= topic.publish(message); - - token.waitForCompletion(); - System.out.println("消息已完全发布!"+token.isComplete()); - log.info("消息已完全发布!"+token.isComplete()); - - } - - /** - * - * @param clieId - * @param msg - * @throws Exception - */ - public static void sendMessage(String clieId,String msg)throws Exception{ - ServerMQTT server = new ServerMQTT(); - server.message = new MqttMessage(); - server.message.setQos(1); //保证消息能到达一次 - server.message.setRetained(true); - String str="{\"clienId\":\""+clieId+"\",\"msg\":\""+msg+"\"}"; - try{ - publish(server.topic11 , server.message); - //断开连接 -// server.client.disconnect(); - }catch (Exception e){ - e.printStackTrace(); - } - } - - - public static void main(String[] args) throws Exception { - sendMessage("123444","哈哈哈"); - } - - - - - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml index 2ab3074..c8f2192 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr spring: application: diff --git a/cloud-modules/cloud-modules-event-process/pom.xml b/cloud-modules/cloud-modules-event-process/pom.xml index 4844c85..1bc5a22 100644 --- a/cloud-modules/cloud-modules-event-process/pom.xml +++ b/cloud-modules/cloud-modules-event-process/pom.xml @@ -81,5 +81,11 @@ com.muyu cloud-common-kafka + + + + com.muyu + cloud-common-rabbit + \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java index b3d9116..aece3d8 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/CloudEventProcessApplication.java @@ -18,4 +18,4 @@ public class CloudEventProcessApplication { public static void main(String[] args) { SpringApplication.run(CloudEventProcessApplication.class, args); } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEvent.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEvent.java index 1701cba..e3865dc 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEvent.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEvent.java @@ -34,4 +34,4 @@ public class BasicEvent extends ApplicationEvent { public T getData() { return data; } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventHandler.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventHandler.java index d9571d2..0b0e7bf 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventHandler.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventHandler.java @@ -34,4 +34,4 @@ public class BasicEventHandler implements ApplicationListener> public void onApplicationEvent(BasicEvent event) { listener.onEvent(event); } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventListener.java index 7fecd6e..bdb2750 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventListener.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/BasicEventListener.java @@ -13,4 +13,4 @@ public interface BasicEventListener { * @param event 事件对象 */ void onEvent(BasicEvent event); -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java index b037cd2..c8aa912 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/basic/EventPublisher.java @@ -35,4 +35,4 @@ public class EventPublisher implements ApplicationEventPublisherAware { public void publish(BasicEvent event) { publisher.publishEvent(event); } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestKafkaConsumer.java similarity index 93% rename from cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java rename to cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestKafkaConsumer.java index bacb039..4b231b4 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestKafkaConsumer.java @@ -15,11 +15,11 @@ import java.util.Collection; /** * @Author: zi run * @Date 2024/9/29 16:53 - * @Description 测试消费者 + * @Description 测试Kafka消费者 */ @Slf4j @RequiredArgsConstructor -public class TestConsumer implements InitializingBean { +public class TestKafkaConsumer implements InitializingBean { /** * kafka消费者 @@ -48,4 +48,4 @@ public class TestConsumer implements InitializingBean { } }).start(); } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestRabbitMQConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestRabbitMQConsumer.java new file mode 100644 index 0000000..c96c687 --- /dev/null +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestRabbitMQConsumer.java @@ -0,0 +1,70 @@ +package com.muyu.event.process.consumer; + +import com.rabbitmq.client.Channel; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @Author: zi run + * @Date 2024/10/2 10:35 + * @Description 测试RabbitMQ消费者 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class TestRabbitMQConsumer { + + /** + * redis服务实例 + */ + private final StringRedisTemplate redisTemplate; + + + /** + * 队列名称 + */ + private static final String queueName = "test-rabbitmq"; + + /** + * redis缓存键名 + */ + private static final String testRabbitMQKey = "test-rabbitmq"; + + /** + * 消费者方法,用于处理从RabbitMQ队列中接收到的消息 + * + * @param msg 消息内容,类型为String + * @param message 原始消息对象,包含消息的属性信息 + * @param channel RabbitMQ通道,用于进行消息确认和拒绝操作 + */ + @RabbitListener(queuesToDeclare = @Queue(name = queueName)) + public void consumer(String msg, Message message, Channel channel) { + String messageId = message.getMessageProperties().getMessageId(); + try { + Long count = redisTemplate.opsForSet().add(testRabbitMQKey, messageId); + + if (count != null && count.intValue() == 1) { + log.info("测试RabbitMQ消费者获取到消息,消息内容:{}", msg); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("测试RabbitMQ消费者消费消息成功!"); + } + } catch (Exception e) { + redisTemplate.opsForSet().remove(testRabbitMQKey, messageId); + log.error("测试RabbitMQ消费者消费消息异常,消息内容:{},异常信息:{}", msg, e.getMessage()); + + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + } catch (IOException ex) { + log.error("测试RabbitMQ消费者回退消息异常,消息内容:{}, 异常信息:{}", msg, ex.getMessage()); + } + } + } + +} diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java index b3fe6b7..89eaf89 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java @@ -12,6 +12,7 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.time.Duration; @@ -52,6 +53,7 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener sendRabbitMQ() { + rabbitTemplate.convertAndSend(rabbitMQQueueName, "测试RabbitMQ队列消息", message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + }); + return Result.success(null, Constants.SUCCESS_MESSAGE); + } + /** * 查询IoTDB数据列表 * @return 响应结果 @@ -85,4 +111,4 @@ public class TestEventController extends BaseController { testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList); return Result.success(null, Constants.SUCCESS_MESSAGE); } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java index 91d7d6e..2253da6 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java @@ -17,4 +17,4 @@ public class IoTDBInsertDataEvent extends BasicEvent { public IoTDBInsertDataEvent(Object source, String messsge) { super(source, messsge); } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java index 18e7feb..726ae10 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java @@ -69,4 +69,4 @@ public class IoTDBSessionConfig { deviceId, time, measurements, values, e.getMessage()); } } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/IService.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/IService.java index fd5d3bf..56e1616 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/IService.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/IService.java @@ -287,4 +287,4 @@ public interface IService { * @return MeasurementSchemaValuesDTO 对象 */ MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj); -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java index bc355d9..e40509b 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java @@ -762,4 +762,4 @@ public class ServiceImpl implements IService { measurementSchemaValuesDTO.setValues(values); return measurementSchemaValuesDTO; } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/DataJSON.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/DataJSON.java index fcd4aac..fd36fda 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/DataJSON.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/DataJSON.java @@ -30,4 +30,4 @@ public class DataJSON { */ @Schema(name = "车辆JSON数据") private String datasource; -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/ResultEntity.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/ResultEntity.java index ad159a6..68c42a9 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/ResultEntity.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/ResultEntity.java @@ -33,4 +33,4 @@ public class ResultEntity extends IoTDbRecordAble { */ private String time; -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/TestDataType.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/TestDataType.java index eef2853..7026b75 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/TestDataType.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/TestDataType.java @@ -40,4 +40,4 @@ public class TestDataType { * 测试Long类型 */ private Long testLong; -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java index 15b6cc2..56555da 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java @@ -64,4 +64,4 @@ public class InsertDataDTO { } return insertDataDTOS; } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java index 5b8ad62..669ee51 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java @@ -9,4 +9,4 @@ import lombok.Data; */ @Data public class IoTDbRecordAble { -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java index f04a72b..356038f 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java @@ -33,4 +33,4 @@ public class MeasurementSchemaValuesDTO { * 存储值为空的索引列表 */ private List valueIsNullIndex; -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java index b473acf..c82c13e 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/IoTDBService.java @@ -8,4 +8,4 @@ import com.muyu.event.process.iotdb.basic.service.IService; * @Description IoTDB业务层 */ public interface IoTDBService extends IService { -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java index 84302bd..4100355 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java @@ -17,4 +17,4 @@ public interface TestIoTDBService extends IService { * @return 返回结果 */ List> list(); -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java index 3167618..b19949a 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java @@ -11,4 +11,4 @@ import org.springframework.stereotype.Service; */ @Service public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService { -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java index 21dde1d..9d4424a 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java @@ -30,4 +30,4 @@ public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBServic log.info("查询IoTDB数据为:{}", list.toString()); return list; } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java index 39e577d..14d5e47 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java @@ -63,4 +63,4 @@ public class IoTDBInsertDataListener implements BasicEventListener { .map(Object::toString) .collect(Collectors.toList()); } -} +} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml index 0b653df..9cafd40 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-event-process/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr spring: application: @@ -44,4 +44,6 @@ spring: # 系统环境Config共享配置 - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} # kafka共享配置 - - application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} \ No newline at end of file + - application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # rabbit共享配置 + - application-rabbit-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/CloudEventProcessApplication.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/CloudEventProcessApplication.java deleted file mode 100644 index 46b5858..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/CloudEventProcessApplication.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.muyu.event.process; - -import com.muyu.common.security.annotation.EnableCustomConfig; -import com.muyu.common.security.annotation.EnableMyFeignClients; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -/** - * @Author: zi run - * @Date 2024/9/28 22:31 - * @Description 事件处理微服启动类 - */ -@EnableCustomConfig -@EnableMyFeignClients -@SpringBootApplication -public class CloudEventProcessApplication { - public static void main(String[] args) { - SpringApplication.run(CloudEventProcessApplication.class, args); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEvent.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEvent.java deleted file mode 100644 index 1701cba..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEvent.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.muyu.event.process.basic; - -import org.springframework.context.ApplicationEvent; - -/** - * @Author: zi run - * @Date 2024/9/30 15:11 - * @Description 基础事件 - */ -public class BasicEvent extends ApplicationEvent { - - /** - * 事件携带的数据 - */ - private final T data; - - /** - * 构造函数,初始化事件源和数据 - * - * @param source 事件源对象 - * @param data 事件携带的数据 - */ - public BasicEvent(Object source, T data) { - super(source); - this.data = data; - } - - - /** - * 获取事件携带的数据 - * - * @return 事件数据 - */ - public T getData() { - return data; - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventHandler.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventHandler.java deleted file mode 100644 index d9571d2..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventHandler.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.muyu.event.process.basic; - -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -/** - * @Author: zi run - * @Date 2024/9/30 15:37 - * @Description 基础事件处理器 - */ -@Component -public class BasicEventHandler implements ApplicationListener> { - - /** - * 具体事件监听器 - */ - private final BasicEventListener listener; - - /** - * 构造函数,用于注入具体事件监听器 - * - * @param listener 具体事件监听器 - */ - public BasicEventHandler(BasicEventListener listener) { - this.listener = listener; - } - - /** - * 处理应用事件 - * - * @param event 事件对象 - */ - @Override - public void onApplicationEvent(BasicEvent event) { - listener.onEvent(event); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventListener.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventListener.java deleted file mode 100644 index 7fecd6e..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/BasicEventListener.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.muyu.event.process.basic; - -/** - * @Author: zi run - * @Date 2024/9/30 15:35 - * @Description 基础事件监听器 - */ -public interface BasicEventListener { - - /** - * 处理事件的方法 - * - * @param event 事件对象 - */ - void onEvent(BasicEvent event); -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/EventPublisher.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/EventPublisher.java deleted file mode 100644 index b037cd2..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/basic/EventPublisher.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.muyu.event.process.basic; - -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.stereotype.Component; - -/** - * @Author: zi run - * @Date 2024/9/29 22:01 - * @Description 事件发布者 - */ -@Component -public class EventPublisher implements ApplicationEventPublisherAware { - - /** - * 应用程序事件发布者,用于发布事件 - */ - private ApplicationEventPublisher publisher; - - /** - * 设置应用程序事件发布者 - * - * @param publisher 应用程序事件发布者实例 - */ - @Override - public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { - this.publisher = publisher; - } - - /** - * 发布事件 - * @param event 要发布的事件 - * @param 事件数据类型 - */ - public void publish(BasicEvent event) { - publisher.publishEvent(event); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/TestConsumer.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/TestConsumer.java deleted file mode 100644 index cd7dbe2..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/TestConsumer.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.muyu.event.process.consumer; - -import cn.hutool.core.thread.ThreadUtil; -import com.alibaba.nacos.shaded.com.google.common.collect.Lists; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.stereotype.Component; - -import java.time.Duration; -import java.util.Collection; - -/** - * @Author: zi run - * @Date 2024/9/29 16:53 - * @Description 测试消费者 - */ -@Slf4j -//@Component -@RequiredArgsConstructor -public class TestConsumer implements InitializingBean { - - /** - * kafka消费者 - */ - private final KafkaConsumer kafkaConsumer; - - /** - * kafka主题名称 - */ - private static final String topicName = "test-topic"; - - - @Override - public void afterPropertiesSet() throws Exception { - new Thread(() -> { - log.info("启动线程监听Topic: {}", topicName); - ThreadUtil.sleep(1000); - Collection topics = Lists.newArrayList(topicName); - kafkaConsumer.subscribe(topics); - while (true) { - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); - consumerRecords.forEach(record -> { - String value = record.value(); - log.info("从Kafka中消费的原始数据: {}", value); - }); - } - }).start(); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/VehicleConsumer.java deleted file mode 100644 index 9a688b4..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/consumer/VehicleConsumer.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.muyu.event.process.consumer; - -import com.muyu.event.process.basic.EventPublisher; -import com.muyu.event.process.event.IoTDBInsertDataEvent; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ContextClosedEvent; -import org.springframework.stereotype.Component; - -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** - * @Author: zi run - * @Date 2024/9/29 23:23 - * @Description 车辆消费者 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class VehicleConsumer implements ApplicationRunner, ApplicationListener { - - /** - * kafka消费者 - */ - private final KafkaConsumer kafkaConsumer; - - /** - * 事件发布者 - */ - private final EventPublisher eventPublisher; - - /** - * 协议解析报文传递数据(队列名称) - */ - public final static String MESSAGE_PARSING = "MessageParsing"; - - /** - * 设定固定大小的线程池,线程数量与当前可用的处理器核心数相同 - */ - private final ExecutorService executorService = - Executors.newFixedThreadPool(10); - - @Override - public void run(ApplicationArguments args) throws Exception { - log.info("启动线程监听Topic: {}", MESSAGE_PARSING); - List topics = Collections.singletonList(MESSAGE_PARSING); - kafkaConsumer.subscribe(topics); - while (true) { - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); - consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord))); - } - } - - private void handleRecord(ConsumerRecord consumerRecord) { - String message = consumerRecord.value(); - log.info("接收到车辆报文数据,内容:{}", message); - log.info("------------------------------------------------"); - eventPublisher.publish(new IoTDBInsertDataEvent(this, message)); - } - - @Override - public void onApplicationEvent(ContextClosedEvent event) { - log.info("关闭线程池和Kafka消费者"); - - try { - executorService.shutdown(); - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - log.error("线程池关闭被中断,强制关闭", e); - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } - - try { - kafkaConsumer.close(); // 关闭Kafka消费者 - } catch (Exception e) { - log.error("关闭Kafka消费者时发生错误", e); - } - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/controller/TestEventController.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/controller/TestEventController.java deleted file mode 100644 index bfc0dc3..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/controller/TestEventController.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.muyu.event.process.controller; - -import com.alibaba.fastjson2.JSONObject; -import com.muyu.common.core.constant.Constants; -import com.muyu.common.core.domain.Result; -import com.muyu.common.core.web.controller.BaseController; -import com.muyu.event.process.iotdb.service.TestIoTDBService; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * @Author: zi run - * @Date 2024/9/29 16:24 - * @Description 测试事件控制层 - */ -@RestController -@RequiredArgsConstructor -@RequestMapping(value = "/test-event") -public class TestEventController extends BaseController { - - /** - * kafka生产者 - */ - private final KafkaProducer kafkaProducer; - - /** - * kafka主题名称 - */ - private static final String kafkaTopicName = "test-topic"; - - /** - * 测试IoTDB业务层 - */ - private final TestIoTDBService testIoTDBService; - - /** - * 发送Kafka测试消息 - * - * @return 响应结果 - */ - @GetMapping(value = "/sendKafka") - public Result senKafka() { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("id","1"); - jsonObject.put("name","张三"); - jsonObject.put("age","18"); - jsonObject.put("sex","男"); - ProducerRecord producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString()); - kafkaProducer.send(producerRecord); - return Result.success(null, Constants.SUCCESS_MESSAGE); - } - - /** - * 查询IoTDB数据列表 - * @return 响应结果 - */ - @GetMapping(value = "/list") - public Result>> list() { - return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE); - } - - /** - * 向IoTDB添加数据 - * - * @return 响应结果 - */ - @PostMapping(value = "/save") - public Result save() { - String deviceId = "root.test"; - ArrayList keyList = new ArrayList<>(); - ArrayList valueList = new ArrayList<>(); - keyList.add("car_vin"); - keyList.add("car_name"); - valueList.add("VIN123456"); - valueList.add("宝马"); - testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList); - return Result.success(null, Constants.SUCCESS_MESSAGE); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java deleted file mode 100644 index 91d7d6e..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/event/IoTDBInsertDataEvent.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.muyu.event.process.event; - -import com.muyu.event.process.basic.BasicEvent; - -/** - * @Author: zi run - * @Date 2024/9/29 21:19 - * @Description 向IoTDB插入数据事件 - */ -public class IoTDBInsertDataEvent extends BasicEvent { - - /** - * 构造函数,向IoTDB插入数据创建事件 - * - * @param messsge 消息 - */ - public IoTDBInsertDataEvent(Object source, String messsge) { - super(source, messsge); - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java deleted file mode 100644 index 18e7feb..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/config/IoTDBSessionConfig.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.muyu.event.process.iotdb.basic.config; - -import lombok.extern.slf4j.Slf4j; -import org.apache.iotdb.session.pool.SessionPool; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; -import java.util.List; - -/** - * @Author: zi run - * @Date 2024/9/28 22:41 - * @Description IoTDB会话配置 - */ -@Slf4j -@Component -@Configuration -public class IoTDBSessionConfig { - - @Value("${spring.iotdb.username}") - private String username; - - @Value("${spring.iotdb.password}") - private String password; - - @Value("${spring.iotdb.ip}") - private String ip; - - @Value("${spring.iotdb.port}") - private int port; - - @Value("${spring.iotdb.maxSize}") - private int maxSize; - - /** - * IoTDB会话池 - */ - private static SessionPool sessionPool = null; - - /** - * 获取IoTDB会话对象 - * @return ioTDB会话对象 - */ - public SessionPool getSessionPool() { - if (sessionPool == null) { - sessionPool = new SessionPool(ip, port, username, password, maxSize); - } - return sessionPool; - } - - /** - * 向IoTDB中插入特定设备的记录 - * - * @param deviceId 设备的唯一标识符 - * @param time 记录的时间戳,以毫秒为单位 - * @param measurements 与记录关联的测量名称列表 - * @param values 每个测量对应的值列表。值的顺序必须与测量名称一一对应 - * - *

该方法从会话池中获取一个会话,并尝试将指定的记录插入到 IoTDB 中。 - * 如果插入失败,将记录错误信息,便于后续排查。

- */ - public void insertRecord(String deviceId, long time, List measurements, List values) { - getSessionPool(); - try { - log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values); - sessionPool.insertRecord(deviceId, time, measurements, values); - } catch (Exception e) { - log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}", - deviceId, time, measurements, values, e.getMessage()); - } - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/IService.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/IService.java deleted file mode 100644 index fd5d3bf..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/IService.java +++ /dev/null @@ -1,290 +0,0 @@ -package com.muyu.event.process.iotdb.basic.service; - -import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; -import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO; -import org.apache.iotdb.common.rpc.thrift.TAggregationType; -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.util.List; -import java.util.Map; - -/** - * @Author: zi run - * @Date 2024/9/28 23:37 - * @Description IoTDB基准业务层 - */ -public interface IService { - - /** - * 插入一个 Tablet 对象到 IoTDB 数据库 - * - * @param tablet 要插入的 Tablet 对象,包含待写入的数据 - */ - void insertTablet(Tablet tablet); - - /** - * 将给定的 Tablets 插入到 IoTDB 数据库中。 - * - * @param tablets 一个 Map,包含要插入的 Tablets - */ - void insertTablets(Map tablets); - - /** - * 单条数据插入(string类型数据项) - * - * @param deviceId 设备名(表名)root.ln.wf01.wt01 - * @param time 时间戳 - * @param measurements 数据项列表 - * @param values 数据项对应值列表 - */ - void insertStringRecord(String deviceId, long time, List measurements, List values); - - /** - * 单条数据插入(不同类型数据项) - * - * @param deviceId 设备名(表名)root.ln.wf01.wt01 - * @param time 时间戳 - * @param measurements 数据项列表 - * @param types 数据项对应类型列表 - * @param values 数据项对应值列表 - */ - void insertRecord(String deviceId, long time, List measurements, - List types, List values); - - /** - * 多个设备多条数据插入(string类型数据项) - * - * @param deviceIds 多个设备名(表名)root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - void insertStringRecords(List deviceIds, List times, - List> measurementsList, List> valuesList); - - /** - * 多个设备多条数据插入(不同类型数据项) - * - * @param deviceIds 多个设备名(表名))root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param typesList 数据项对应类型列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - void insertRecords(List deviceIds, List times, List> measurementsList, - List> typesList, List> valuesList); - - /** - * 单个设备多条数据插入(string类型数据项) - * - * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - void insertStringRecordsOfOneDevice(String deviceId, List times, - List> measurementsList, List> valuesList); - - /** - * 单个设备多条数据插入(不同类型数据项) - * - * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param typesList 数据项对应类型列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, - List> typesList, List> valuesList); - - /** - * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据) - * - * @param path 单个字段 root.ln.wf01.wt01.temperature - * @param endTime 删除时间点 - */ - void deleteData(String path, long endTime); - - /** - * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param endTime 删除时间点 - */ - void deleteData(List paths, long endTime); - - /** - * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param startTime 开始时间 - * @param endTime 结束时间 - * @param outTime 超时时间 - * @return SessionDataSet (Time,paths) - */ - SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long outTime); - - /** - * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) - * - * @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature" - * @param startTime 查询数据的起始时间(包含该时间点) - * @param endTime 查询数据的结束时间(不包含该时间点) - * @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间 - * @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致 - * @param 返回数据的对象类型泛型 - * @return 查询结果的对象列表,如果查询失败则返回 null - */ - List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, - Class clazz); - - /** - * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param lastTime 结束时间 - * @return SessionDataSet - */ - SessionDataSet executeLastDataQuery(List paths, long lastTime); - - /** - * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param lastTime 结束时间 - * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @return 查询结果的对象列表,如果查询失败则返回 null - * @param 返回数据的对象类型泛型 - */ - List executeLastDataQuery(List paths, long lastTime, Class clazz); - - /** - * 最新点查询(快速查询单设备下指定序列最新点) - * - * @param db root.ln.wf01 - * @param device root.ln.wf01.wt01 - * @param sensors temperature,status(字段名) - * @param isLegalPathNodes true(避免路径校验) - * @return SessionDataSet - */ - SessionDataSet executeLastDataQueryForOneDevice(String db, String device, - List sensors, boolean isLegalPathNodes); - - /** - * 查询单个设备的最新数据(获取指定设备的最新传感器数据) - * - * @param db root.ln.wf01 - * @param device root.ln.wf01.wt01 - * @param sensors temperature,status(字段名) - * @param isLegalPathNodes true(避免路径校验) - * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @return 查询结果的对象列表,如果查询失败则返回 null - * @param 返回数据的对象类型泛型 - */ - List executeLastDataQueryForOneDevice(String db, String device, List sensors, - boolean isLegalPathNodes, Class clazz); - - /** - * 聚合查询 - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @return SessionDataSet - */ - SessionDataSet executeAggregationQuery(List paths, List aggregations); - - /** - * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @param startTime 开始时间(包含) - * @param endTime 结束时间 - * @return SessionDataSet - */ - SessionDataSet executeAggregationQuery(List paths, List aggregations, - long startTime, long endTime); - - /** - * 聚合查询(支持按照时间区间分段查询) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @param startTime 开始时间(包含) - * @param endTime 结束时间 - * @param interval 查询的时间间隔(单位为毫秒) - * @return SessionDataSet - */ - SessionDataSet executeAggregationQuery(List paths, List aggregations, - long startTime, long endTime, long interval); - - /** - * 聚合查询(支持按照时间区间分段查询) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @param startTime 开始时间(包含) - * @param endTime 结束时间 - * @param interval 查询的时间间隔(单位为毫秒) - * @param slidingStep 滑动步长(单位为毫秒) - * @return SessionDataSet - */ - SessionDataSet executeAggregationQuery(List paths, List aggregations, - long startTime, long endTime, long interval, long slidingStep); - - /** - * SQL查询 - * - * @param sql SQL查询语句,支持IotDB的查询语法 - * @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null - */ - SessionDataSet executeQueryStatement(String sql); - - - /** - * SQL非查询 - * - * @param sql SQL查询语句,支持IotDB的查询语法 - */ - void executeNonQueryStatement(String sql); - - /** - * 封装处理数据 - * - * @param sessionDataSet 包含查询结果的SessionDataSet对象 - * @param titleList 列标题列表,用于映射字段名称 - * @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值 - */ - List> packagingMapData(SessionDataSet sessionDataSet, List titleList); - - /** - * 封装处理数据(不支持聚合查询) - * - * @param sessionDataSet 查询返回的结果集 - * @param titleList 查询返回的结果集内的字段名 - * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @return 返回封装后的对象列表,每个对象对应一行结果集数据 - * @param 返回对象的类型 - */ - List packagingObjectData(SessionDataSet sessionDataSet, List titleList, - Class clazz); - - /** - * 根据对象构建MeasurementSchemas - * - * @param obj 要从中提取字段信息的对象 - * @return 返回一个包含 MeasurementSchema 的列表 - */ - List buildMeasurementSchemas(Object obj); - - /** - * 根据对象构建MeasurementSchemaValuesDTO - * - * @param obj 要从中提取字段信息和对应值的对象 - * @return MeasurementSchemaValuesDTO 对象 - */ - MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj); -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java deleted file mode 100644 index bc355d9..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/basic/service/impl/ServiceImpl.java +++ /dev/null @@ -1,765 +0,0 @@ -package com.muyu.event.process.iotdb.basic.service.impl; - -import com.alibaba.fastjson.JSON; -import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig; -import com.muyu.event.process.iotdb.basic.service.IService; -import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; -import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.iotdb.common.rpc.thrift.TAggregationType; -import org.apache.iotdb.isession.pool.SessionDataSetWrapper; -import org.apache.iotdb.session.pool.SessionPool; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.Field; -import org.apache.iotdb.tsfile.read.common.RowRecord; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.lang.reflect.Type; -import java.util.*; -import java.util.stream.Collectors; - -/** - * @Author: zi run - * @Date 2024/9/28 23:38 - * @Description IoTDB基准业务实现层 - */ -@Slf4j -@Service -public class ServiceImpl implements IService { - - /** - * IoTDB会话配置 - */ - @Autowired - private IoTDBSessionConfig ioTDBSessionConfig; - - /** - * 插入一个 Tablet 对象到 IoTDB 数据库 - * - * @param tablet 要插入的 Tablet 对象,包含待写入的数据 - */ - @Override - public void insertTablet(Tablet tablet) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:tablet:[{}]", tablet); - sessionPool.insertTablet(tablet); - } catch (Exception e) { - log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage()); - } - } - - /** - * 将给定的 Tablets 插入到 IoTDB 数据库中。 - * - * @param tablets 一个 Map,包含要插入的 Tablets - */ - @Override - public void insertTablets(Map tablets) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:tablets:[{}]", tablets); - sessionPool.insertTablets(tablets); - } catch (Exception e) { - log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage()); - } - } - - /** - * 单条数据插入(string类型数据项) - * - * @param deviceId 设备名(表名)root.ln.wf01.wt01 - * @param time 时间戳 - * @param measurements 数据项列表 - * @param values 数据项对应值列表 - */ - @Override - public void insertStringRecord(String deviceId, long time, List measurements, List values) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values); - sessionPool.insertRecord(deviceId, time, measurements, values); - } catch (Exception e) { - log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}", - deviceId, time, measurements, values, e.getMessage()); - } - } - - /** - * 单条数据插入(不同类型数据项) - * - * @param deviceId 设备名(表名)root.ln.wf01.wt01 - * @param time 时间戳 - * @param measurements 数据项列表 - * @param types 数据项对应类型列表 - * @param values 数据项对应值列表 - */ - @Override - public void insertRecord(String deviceId, long time, List measurements, - List types, List values) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", - deviceId, measurements, types, values); - sessionPool.insertRecord(deviceId, time, measurements, types, values); - } catch (Exception e) { - log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={}, types={}, " + - "values={}, error={}", deviceId, time, measurements, types, values, e.getMessage()); - } - } - - - /** - * 多个设备多条数据插入(string类型数据项) - * - * @param deviceIds 多个设备名(表名)root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - @Override - public void insertStringRecords(List deviceIds, List times, List> measurementsList, - List> valuesList) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", - deviceIds, measurementsList, valuesList); - sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList); - } catch (Exception e) { - log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, " + - "valuesList={}, error={}", deviceIds, times, measurementsList, valuesList, e.getMessage()); - } - } - - /** - * 多个设备多条数据插入(不同类型数据项) - * - * @param deviceIds 多个设备名(表名))root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param typesList 数据项对应类型列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - @Override - public void insertRecords(List deviceIds, List times, List> measurementsList, - List> typesList, List> valuesList) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", - deviceIds, measurementsList, typesList, valuesList); - sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); - } catch (Exception e) { - log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList={}, " + - "valuesList={}, error={}", - deviceIds, times, measurementsList, typesList, valuesList, e.getMessage()); - } - } - - /** - * 单个设备多条数据插入(string类型数据项) - * - * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - @Override - public void insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, - List> valuesList) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", - deviceId, measurementsList, valuesList); - sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList); - } catch (Exception e) { - log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, " + - "measurementsList={}, valuesList={}, error={}", - deviceId, times, measurementsList, valuesList, e.getMessage()); - } - } - - /** - * 单个设备多条数据插入(不同类型数据项) - * - * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 - * @param times 时间戳的列表 - * @param measurementsList 数据项列表的列表 - * @param typesList 数据项对应类型列表的列表 - * @param valuesList 数据项对应值列表的列表 - */ - @Override - public void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, - List> typesList, List> valuesList) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", - deviceId, measurementsList, typesList, valuesList); - sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList); - } catch (Exception e) { - log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, " + - "measurementsList={},typesList={},valuesList={}, error={}", - deviceId, times, measurementsList, typesList, valuesList, e.getMessage()); - } - } - - /** - * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据) - * - * @param path 单个字段 root.ln.wf01.wt01.temperature - * @param endTime 删除时间点 - */ - @Override - public void deleteData(String path, long endTime) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据删除:path:[{}], endTime:[{}]", path, endTime); - sessionPool.deleteData(path, endTime); - } catch (Exception e) { - log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage()); - } - } - - /** - * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param endTime 删除时间点 - */ - @Override - public void deleteData(List paths, long endTime) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb数据删除:paths:[{}], endTime:[{}]", paths, endTime); - sessionPool.deleteData(paths, endTime); - } catch (Exception e) { - log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage()); - } - } - - /** - * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param startTime 开始时间 - * @param endTime 结束时间 - * @param outTime 超时时间 - * @return SessionDataSet (Time,paths) - */ - @Override - public SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long outTime) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - try { - log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", - paths, startTime, endTime, outTime); - sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " + - "outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) - * - * @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature" - * @param startTime 查询数据的起始时间(包含该时间点) - * @param endTime 查询数据的结束时间(不包含该时间点) - * @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间 - * @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致 - * @param 返回数据的对象类型泛型 - * @return 查询结果的对象列表,如果查询失败则返回 null - */ - @Override - public List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, - Class clazz) { - SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime); - List columnNames = sessionDataSet.getColumnNames(); - List resultEntities = null; - try { - resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); - } catch (Exception e) { - log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " + - "outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage()); - } - return resultEntities; - } - - /** - * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param lastTime 结束时间 - * @return SessionDataSet - */ - @Override - public SessionDataSet executeLastDataQuery(List paths, long lastTime) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - try { - log.info("iotdb数据查询:paths:[{}], lastTime:[{}]", paths, lastTime); - sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", - paths, lastTime, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param lastTime 结束时间 - * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @return 查询结果的对象列表,如果查询失败则返回 null - * @param 返回数据的对象类型泛型 - */ - @Override - public List executeLastDataQuery(List paths, long lastTime, Class clazz) { - SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime); - List columnNames = sessionDataSet.getColumnNames(); - List resultEntities = null; - try { - resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); - } catch (Exception e) { - log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", - paths, lastTime, e.getMessage()); - } - return resultEntities; - } - - /** - * 最新点查询(快速查询单设备下指定序列最新点) - * - * @param db root.ln.wf01 - * @param device root.ln.wf01.wt01 - * @param sensors temperature,status(字段名) - * @param isLegalPathNodes true(避免路径校验) - * @return SessionDataSet - */ - @Override - public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List sensors, - boolean isLegalPathNodes) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - try { - log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", - db, device, sensors, isLegalPathNodes); - sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}], sensors:[{}], " + - "isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * 查询单个设备的最新数据(获取指定设备的最新传感器数据) - * - * @param db root.ln.wf01 - * @param device root.ln.wf01.wt01 - * @param sensors temperature,status(字段名) - * @param isLegalPathNodes true(避免路径校验) - * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @return 查询结果的对象列表,如果查询失败则返回 null - * @param 返回数据的对象类型泛型 - */ - @Override - public List executeLastDataQueryForOneDevice(String db, String device, List sensors, - boolean isLegalPathNodes, - Class clazz) { - SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); - List columnNames = sessionDataSet.getColumnNames(); - List resultEntities = null; - try { - resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); - } catch (Exception e) { - log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], " + - "isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); - } - return resultEntities; - } - - /** - * 聚合查询 - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @return SessionDataSet - */ - @Override - public SessionDataSet executeAggregationQuery(List paths, List aggregations) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - try { - log.info("iotdb聚合查询:paths:[{}], aggregations:[{}]", paths, aggregations); - sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", - paths, aggregations, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @param startTime 开始时间(包含) - * @param endTime 结束时间 - * @return SessionDataSet - */ - @Override - public SessionDataSet executeAggregationQuery(List paths, List aggregations, - long startTime, long endTime) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - try { - log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", - paths, aggregations, startTime, endTime); - sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}], " + - "startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * 聚合查询(支持按照时间区间分段查询) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @param startTime 开始时间(包含) - * @param endTime 结束时间 - * @param interval 查询的时间间隔(单位为毫秒) - * @return SessionDataSet - */ - @Override - public SessionDataSet executeAggregationQuery(List paths, List aggregations, - long startTime, long endTime, long interval) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - try { - log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", - paths, aggregations, startTime, endTime, interval); - sessionDataSetWrapper = sessionPool.executeAggregationQuery( - paths, aggregations, startTime, endTime, interval - ); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " + - "startTime:[{}], endTime:[{}], interval:[{}], error={}", - paths, aggregations, startTime, endTime, interval, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * 聚合查询(支持按照时间区间分段查询) - * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT - * @param startTime 开始时间(包含) - * @param endTime 结束时间 - * @param interval 查询的时间间隔(单位为毫秒) - * @param slidingStep 滑动步长(单位为毫秒) - * @return SessionDataSet - */ - @Override - public SessionDataSet executeAggregationQuery(List paths, List aggregations, - long startTime, long endTime, long interval, long slidingStep) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - - try { - log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], " + - "slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep); - sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, - interval, slidingStep); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " + - "startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", - paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * SQL查询 - * - * @param sql SQL查询语句,支持IotDB的查询语法 - * @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null - */ - @Override - public SessionDataSet executeQueryStatement(String sql) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - SessionDataSetWrapper sessionDataSetWrapper = null; - - try { - log.info("iotdb SQL查询:sql:[{}]", sql); - sessionDataSetWrapper = sessionPool.executeQueryStatement(sql); - return sessionDataSetWrapper.getSessionDataSet(); - } catch (Exception e) { - log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage()); - } finally { - sessionPool.closeResultSet(sessionDataSetWrapper); - } - return null; - } - - /** - * SQL非查询 - * - * @param sql SQL查询语句,支持IotDB的查询语法 - */ - @Override - public void executeNonQueryStatement(String sql) { - SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); - try { - log.info("iotdb SQL无查询:sql:[{}]", sql); - sessionPool.executeNonQueryStatement(sql); - } catch (Exception e) { - log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage()); - } - } - - /** - * 封装处理数据 - * - * @param sessionDataSet 包含查询结果的SessionDataSet对象 - * @param titleList 列标题列表,用于映射字段名称 - * @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值 - */ - @SneakyThrows - @Override - public List> packagingMapData(SessionDataSet sessionDataSet, List titleList) { - int fetchSize = sessionDataSet.getFetchSize(); - List> resultList = new ArrayList<>(); - titleList.remove("Time"); - if (fetchSize > 0) { - while (sessionDataSet.hasNext()) { - Map resultMap = new HashMap<>(); - RowRecord next = sessionDataSet.next(); - List fields = next.getFields(); - String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(next.getTimestamp()); - resultMap.put("time", timeString); - for (int i = 0; i < fields.size(); i++) { - Field field = fields.get(i); - if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { - resultMap.put(splitString(titleList.get(i)), null); - } else { - resultMap.put(splitString(titleList.get(i)), - field.getObjectValue(field.getDataType()).toString()); - } - } - resultList.add(resultMap); - } - } - return resultList; - } - - /** - * 封装处理数据(不支持聚合查询) - * - * @param sessionDataSet 查询返回的结果集 - * @param titleList 查询返回的结果集内的字段名 - * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @return 返回封装后的对象列表,每个对象对应一行结果集数据 - * @param 返回对象的类型 - */ - @SneakyThrows - @Override - public List packagingObjectData(SessionDataSet sessionDataSet, List titleList, - Class clazz) { - int fetchSize = sessionDataSet.getFetchSize(); - List resultList = new ArrayList<>(); - titleList.remove("Time"); - if (fetchSize > 0) { - while (sessionDataSet.hasNext()) { - Map resultMap = new HashMap<>(); - RowRecord next = sessionDataSet.next(); - List fields = next.getFields(); - String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(next.getTimestamp()); - resultMap.put("time", timeString); - if (titleList.stream().anyMatch(str -> str.contains("."))) { - for (int i = 0; i < fields.size(); i++) { - Field field = fields.get(i); - String title = titleList.get(i); - if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { - resultMap.put(splitString(title), null); - } else { - resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString()); - } - } - } else { - Field fieldName = fields.get(0); - Field fieldValue = fields.get(1); - Field fieldDataType = fields.get(2); - if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) { - String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString(); - Object mapValue = convertStringToType( - fieldValue.getObjectValue(fieldValue.getDataType()).toString(), - fieldDataType.getObjectValue(fieldDataType.getDataType()).toString() - ); - resultMap.put(splitString(mapKey), mapValue); - } - } - - String jsonString = JSON.toJSONString(resultMap); - resultList.add(JSON.parseObject(jsonString, (Type) clazz)); - } - } - return resultList; - } - - /** - * 分割获取字段名 - * - * @param str 输入的字符串 - * @return 字段名 - */ - public static String splitString(String str) { - String[] parts = str.split("\\."); - if (parts.length <= 0) { - return str; - } else { - return parts[parts.length - 1]; - } - } - - /** - * 根据数据值和数据类型返回对应数据类型数据 - * - * @param value 数据值 - * @param typeName 数据类型 - * @return 转换后的数据值 - */ - public static Object convertStringToType(String value, String typeName) { - String type = typeName.toLowerCase(); - if (type.isEmpty()) { - return value; - } - if ("boolean".equals(type)) { - return Boolean.parseBoolean(value); - } else if ("double".equals(type)) { - return Double.parseDouble(value); - } else if ("int32".equals(type)) { - return Integer.parseInt(value); - } else if ("int64".equals(type)) { - return Long.parseLong(value); - } else if ("float".equals(type)) { - return Float.parseFloat(value); - } else if ("text".equals(type)) { - return value; - } else { - return value; - } - } - - /** - * 根据对象属性的数据类型返回对应的TSDataType - * - * @param type 属性的数据类型 - * @return TSDataType - */ - public static TSDataType getTsDataTypeByString(String type) { - String typeName = splitString(type).toLowerCase(); - if ("boolean".equals(typeName)) { - return TSDataType.BOOLEAN; - } else if ("double".equals(typeName)) { - return TSDataType.DOUBLE; - } else if ("int".equals(typeName) || "integer".equals(typeName)) { - return TSDataType.INT32; - } else if ("long".equals(typeName)) { - return TSDataType.INT64; - } else if ("float".equals(typeName)) { - return TSDataType.FLOAT; - } else if ("text".equals(typeName)) { - return TSDataType.TEXT; - } else if ("string".equals(typeName)) { - return TSDataType.TEXT; - } else { - return TSDataType.UNKNOWN; - } - } - - /** - * 根据对象构建MeasurementSchemas - * - * @param obj 要从中提取字段信息的对象 - * @return 返回一个包含 MeasurementSchema 的列表 - */ - @Override - public List buildMeasurementSchemas(Object obj) { - java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields(); - List schemaList = Arrays.stream(fields).map(field -> - new MeasurementSchema(field.getName(), - getTsDataTypeByString( - field.getType().getName() - ))). - collect(Collectors.toList()); - return schemaList; - } - - /** - * 根据对象构建MeasurementSchemaValuesDTO - * - * @param obj 要从中提取字段信息和对应值的对象 - * @return MeasurementSchemaValuesDTO 对象 - */ - @SneakyThrows - @Override - public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) { - MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO(); - java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields(); - List schemaList = new ArrayList<>(); - List values = new ArrayList<>(); - List valuesIsNullIndex = new ArrayList<>(); - int valueIndex = 0; - for (java.lang.reflect.Field field : fields) { - MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), - getTsDataTypeByString(field.getType().getName())); - schemaList.add(measurementSchema); - Object value = field.get(obj); - if (value == null) { - valuesIsNullIndex.add(valueIndex); - } - values.add(value); - valueIndex++; - } - measurementSchemaValuesDTO.setSchemaList(schemaList); - measurementSchemaValuesDTO.setValues(values); - return measurementSchemaValuesDTO; - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/DataJSON.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/DataJSON.java deleted file mode 100644 index fcd4aac..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/DataJSON.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.muyu.event.process.iotdb.domain; - -import io.swagger.v3.oas.annotations.media.Schema; -import io.swagger.v3.oas.annotations.tags.Tag; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; - -/** - * @Author: zi run - * @Date 2024/9/29 0:11 - * @Description JSON数据对象 - */ -@Data -@SuperBuilder -@AllArgsConstructor -@NoArgsConstructor -@Tag(name = "ionDB数据源对象") -public class DataJSON { - - /** - * 时间戳 - */ - @Schema(name = "时间戳") - private Long timestamp; - - /** - * 车辆JSON数据 - */ - @Schema(name = "车辆JSON数据") - private String datasource; -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/ResultEntity.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/ResultEntity.java deleted file mode 100644 index ad159a6..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/ResultEntity.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.muyu.event.process.iotdb.domain; - -import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble; -import lombok.Data; -import lombok.EqualsAndHashCode; - -/** - * @Author: zi run - * @Date 2024/9/29 0:18 - * @Description 结果实体 - */ -@Data -@EqualsAndHashCode(callSuper = true) -public class ResultEntity extends IoTDbRecordAble { - - /** - * 温度值 - */ - private Float temperature; - - /** - * 硬件标识 - */ - private String hardware; - - /** - * 状态标识 - */ - private Boolean status; - - /** - * 时间 - */ - private String time; - -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/TestDataType.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/TestDataType.java deleted file mode 100644 index eef2853..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/TestDataType.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.muyu.event.process.iotdb.domain; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -/** - * @Author: zi run - * @Date 2024/9/29 0:22 - * @Description 测试数据类型 - */ -@Data -@Builder -@AllArgsConstructor -@NoArgsConstructor -public class TestDataType { - - /** - * 温度值 - */ - private Float temperature; - - /** - * 硬件标识 - */ - private String hardware; - - /** - * 状态标识 - */ - private Boolean status; - - /** - * 测试Double类型 - */ - private Double testDouble; - - /** - * 测试Long类型 - */ - private Long testLong; -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java deleted file mode 100644 index 15b6cc2..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/InsertDataDTO.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.muyu.event.process.iotdb.domain.dto; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.ArrayList; -import java.util.List; - -/** - * @Author: zi run - * @Date 2024/9/29 0:12 - * @Description 插入数据 数据转换对象 - */ -@Data -@Builder -@AllArgsConstructor -@NoArgsConstructor -public class InsertDataDTO { - - /** - * 温度值 - */ - private Float temperature; - - /** - * 硬件标识 - */ - private String hardware; - - /** - * 状态标识 - */ - private Boolean status; - - /** - * 创建一个单一的InsertDataDTO实例,并设置默认值。 - * - * @return 一个配置好的InsertDataDTO对象 - */ - public InsertDataDTO buildOne() { - InsertDataDTO insertDataDTO = new InsertDataDTO(); - insertDataDTO.setHardware("ss"); - insertDataDTO.setStatus(true); - insertDataDTO.setTemperature(12.0F); - return insertDataDTO; - } - - /** - * 创建一个单一的InsertDataDTO实例,并设置默认值。 - * - * @return 一个配置好的InsertDataDTO对象 - */ - public List buildList() { - List insertDataDTOS = new ArrayList<>(); - int buildNum = 10; - for (int i = 0; i < buildNum; i++) { - InsertDataDTO insertDataDTO = new InsertDataDTO(); - insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null); - insertDataDTO.setStatus(i % 2 == 0); - insertDataDTO.setTemperature(12.0F + i); - insertDataDTOS.add(insertDataDTO); - } - return insertDataDTOS; - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java deleted file mode 100644 index 5b8ad62..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/IoTDbRecordAble.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.muyu.event.process.iotdb.domain.dto; - -import lombok.Data; - -/** - * @Author: zi run - * @Date 2024/9/29 0:23 - * @Description IoTDB数据库记录对象 - */ -@Data -public class IoTDbRecordAble { -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java deleted file mode 100644 index f04a72b..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/domain/dto/MeasurementSchemaValuesDTO.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.muyu.event.process.iotdb.domain.dto; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import java.util.List; - -/** - * @Author: zi run - * @Date 2024/9/29 0:26 - * @Description 测量模式及其对应的值 数据传输对象 - */ -@Data -@SuperBuilder -@AllArgsConstructor -@NoArgsConstructor -public class MeasurementSchemaValuesDTO { - - /** - * 测量模式列表,每个元素表示一个测量的定义,包括名称、数据类型等信息。 - */ - private List schemaList; - - /** - * 对应于测量模式的实际值列表,存储与 schemaList 中每个测量相对应的值。 - */ - private List values; - - /** - * 存储值为空的索引列表 - */ - private List valueIsNullIndex; -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/IoTDBService.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/IoTDBService.java deleted file mode 100644 index b473acf..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/IoTDBService.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.muyu.event.process.iotdb.service; - -import com.muyu.event.process.iotdb.basic.service.IService; - -/** - * @Author: zi run - * @Date 2024/9/29 22:38 - * @Description IoTDB业务层 - */ -public interface IoTDBService extends IService { -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java deleted file mode 100644 index 84302bd..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/TestIoTDBService.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.muyu.event.process.iotdb.service; - -import com.muyu.event.process.iotdb.basic.service.IService; - -import java.util.List; -import java.util.Map; - -/** - * @Author: zi run - * @Date 2024/9/29 17:23 - * @Description 测试IoTDB业务层 - */ -public interface TestIoTDBService extends IService { - - /** - * 查询IoTDB数据列表 - * @return 返回结果 - */ - List> list(); -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java deleted file mode 100644 index 3167618..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/IoTDBServiceImpl.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.muyu.event.process.iotdb.service.impl; - -import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl; -import com.muyu.event.process.iotdb.service.IoTDBService; -import org.springframework.stereotype.Service; - -/** - * @Author: zi run - * @Date 2024/9/29 22:39 - * @Description IoTDB业务实现层 - */ -@Service -public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService { -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java deleted file mode 100644 index 21dde1d..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/iotdb/service/impl/TestIoTDBServiceImpl.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.muyu.event.process.iotdb.service.impl; - -import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl; -import com.muyu.event.process.iotdb.service.TestIoTDBService; -import lombok.extern.slf4j.Slf4j; -import org.apache.iotdb.isession.SessionDataSet; -import org.springframework.stereotype.Service; - -import java.util.List; -import java.util.Map; - -/** - * @Author: zi run - * @Date 2024/9/29 17:24 - * @Description 测试IoTDB业务实现层 - */ -@Slf4j -@Service -public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService { - - /** - * 查询IoTDB数据列表 - * @return 返回结果 - */ - @Override - public List> list() { - String sql = "select * from root.test"; - SessionDataSet sessionDataSet = this.executeQueryStatement(sql); - List> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes()); - log.info("查询IoTDB数据为:{}", list.toString()); - return list; - } -} diff --git a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java b/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java deleted file mode 100644 index 0c0f96c..0000000 --- a/cloud-modules/cloud-modules-event-process/src/test/java/com/muyu/event/process/listener/IoTDBInsertDataListener.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.muyu.event.process.listener; - -import com.alibaba.fastjson2.JSONObject; -import com.muyu.event.process.basic.BasicEvent; -import com.muyu.event.process.basic.BasicEventListener; -import com.muyu.event.process.event.IoTDBInsertDataEvent; -import com.muyu.event.process.iotdb.service.IoTDBService; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @Author: zi run - * @Date 2024/9/29 22:12 - * @Description 向IoTDB插入数据事件监听器 - */ -@Component -@RequiredArgsConstructor -public class IoTDBInsertDataListener implements BasicEventListener { - - /** - * IoTDB业务层 - */ - private final IoTDBService ioTDBService; - - /** - * 设备名(表名) - */ - private static final String DEVICE_ID = "root.vehicle"; - - /** - * 处理接收到的事件,将数据插入到 IoTDB - * - * @param event 接收到的事件,包含需要插入的数据 - */ - @Override - public void onEvent(BasicEvent event) { - JSONObject data = JSONObject.parseObject(event.getData()); - List keyList = extractKeys(data); - List valueList = extractValues(data); - ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList); - } - - /** - * 从给定的JSONObject中提取所有的键 - * - * @param data 要提取键的JSONObject - * @return 键的列表 - */ - private List extractKeys(JSONObject data) { - return data.keySet().stream().collect(Collectors.toList()); - } - - /** - * 从给定的 JSONObject 中提取所有的值,并将其转换为字符串 - * - * @param data 要提取值的JSONObject - * @return 值的列表,以字符串形式表示 - */ - private List extractValues(JSONObject data) { - return data.values().stream() - .map(Object::toString) - .collect(Collectors.toList()); - } -} diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml index b471a18..b6ae6e2 100644 --- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml index 7e3624f..435f3f0 100644 --- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml index c6ad48f..fa27aa5 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml +++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml @@ -54,6 +54,12 @@ mysql-connector-j + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + com.muyu diff --git a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml index 44cf5e2..9e4b39e 100644 --- a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr spring: application: diff --git a/cloud-modules/cloud-modules-vehicle-gateway/pom.xml b/cloud-modules/cloud-modules-vehicle-gateway/pom.xml index 78a42aa..12b2946 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/pom.xml +++ b/cloud-modules/cloud-modules-vehicle-gateway/pom.xml @@ -60,10 +60,10 @@ cloud-common-datascope + org.eclipse.paho org.eclipse.paho.client.mqttv3 - 1.2.5 @@ -84,27 +84,31 @@ ecs20140526 + com.aliyun tea-openapi + com.aliyun tea-console + com.aliyun tea-util + + com.aliyun cloudapi20160714 - - + aliyun-repo diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml index 26e2e1b..1676a85 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr spring: application: diff --git a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml index da1f316..1b52b6b 100644 --- a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml +++ b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/pom.xml b/pom.xml index 52a5dde..4989cc4 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ 3.0.0 1.3.1 1.4.13 + 1.2.5 @@ -219,6 +220,48 @@ ${iotdb.session.verison} + + + com.aliyun + ecs20140526 + ${ecs20140526.version} + + + + + com.aliyun + tea-openapi + ${tea-openapi.version} + + + + + com.aliyun + tea-console + ${tea-console.version} + + + + + com.aliyun + tea-util + ${tea-util.version} + + + + + com.aliyun + cloudapi20160714 + ${cloudapi20160714.version} + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + ${org.eclipse.paho.client.mqttv3.version} + + com.muyu