diff --git a/pom.xml b/pom.xml
index 4230ae8..4cbe4f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,26 +38,35 @@
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
org.apache.kafka
kafka-clients
3.3.1
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
org.springframework.boot
spring-boot-starter-web
-
- org.springframework.kafka
- spring-kafka
- 2.8.11
-
+
com.alibaba
fastjson
1.2.83
+
org.projectlombok
lombok
@@ -69,6 +78,11 @@
org.eclipse.paho.client.mqttv3
1.2.5
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
diff --git a/src/main/java/com/god/MqttKafkaApplication.java b/src/main/java/com/god/MqttKafkaApplication.java
index 6c90b3f..1d6757d 100644
--- a/src/main/java/com/god/MqttKafkaApplication.java
+++ b/src/main/java/com/god/MqttKafkaApplication.java
@@ -2,10 +2,11 @@ package com.god;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class MqttKafkaApplication {
public static void main(String[] args) {
- SpringApplication.run(MqttKafkaApplication.class);
+ SpringApplication.run(MqttKafkaApplication.class, args);
}
}
diff --git a/src/main/java/com/god/mqtt/config/MqttConfig.java b/src/main/java/com/god/mqtt/config/MqttConfig.java
index d959727..76763e0 100644
--- a/src/main/java/com/god/mqtt/config/MqttConfig.java
+++ b/src/main/java/com/god/mqtt/config/MqttConfig.java
@@ -1,44 +1,44 @@
-package com.god.mqtt.config;
-
-import com.god.mqtt.service.MqttService;
-import lombok.extern.log4j.Log4j2;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.UUID;
-
-/**
- * @author DongZl
- * @description: Mqtt配置
- * @Date 2023-11-24 下午 02:06
- */
-@Log4j2
-@Configuration
-public class MqttConfig {
-
- @Bean
- public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){
- try {
- log.info("mqtt服务器初始化开始");
- long startTime = System.currentTimeMillis();
- MqttClient client = new MqttClient(mqttProper.getBroker(),
- UUID.randomUUID().toString(),
- new MemoryPersistence());
- MemoryPersistence memoryPersistence = new MemoryPersistence();
- // 连接参数
- MqttConnectOptions options = new MqttConnectOptions();
- options.setConnectionTimeout(60);
- options.setKeepAliveInterval(60);
- log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime);
- client.connect(options);
- client.setCallback(mqttService);
- client.subscribe(mqttProper.getTopic(), 0);
- return client;
- }catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
+//package com.god.mqtt.config;
+//
+//import com.god.mqtt.service.MqttService;
+//import lombok.extern.log4j.Log4j2;
+//import org.eclipse.paho.client.mqttv3.MqttClient;
+//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//
+//import java.util.UUID;
+//
+///**
+// * @author DongZl
+// * @description: Mqtt配置
+// * @Date 2023-11-24 下午 02:06
+// */
+//@Log4j2
+//@Configuration
+//public class MqttConfig {
+//
+// @Bean
+// public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){
+// try {
+// log.info("mqtt服务器初始化开始");
+// long startTime = System.currentTimeMillis();
+// MqttClient client = new MqttClient(mqttProper.getBroker(),
+// UUID.randomUUID().toString(),
+// new MemoryPersistence());
+// MemoryPersistence memoryPersistence = new MemoryPersistence();
+// // 连接参数
+// MqttConnectOptions options = new MqttConnectOptions();
+// options.setConnectionTimeout(60);
+// options.setKeepAliveInterval(60);
+// log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime);
+// client.connect(options);
+// client.setCallback(mqttService);
+// client.subscribe(mqttProper.getTopic(), 0);
+// return client;
+// }catch (Exception e) {
+// throw new RuntimeException(e);
+// }
+// }
+//}
diff --git a/src/main/java/com/god/mqtt/listen/MqttMessageListener.java b/src/main/java/com/god/mqtt/listen/MqttMessageListener.java
new file mode 100644
index 0000000..5323161
--- /dev/null
+++ b/src/main/java/com/god/mqtt/listen/MqttMessageListener.java
@@ -0,0 +1,89 @@
+package com.god.mqtt.listen;
+
+import com.god.mqtt.config.MqttProper;
+import com.god.mqtt.service.MqttService;
+import com.rabbitmq.client.Channel;
+import lombok.extern.log4j.Log4j2;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+@Component
+@Log4j2
+public class MqttMessageListener {
+
+ private MqttClient mqttClient;
+
+ @Autowired
+ private MqttProper mqttProper;
+
+ @Autowired
+ MqttService mqttService;
+
+ /**
+ * 处理RabbitMQ消息
+ * @param message
+ */
+ @RabbitListener(queuesToDeclare = @Queue("TOPIC_INFORM"))
+ public void onMessage(String msg, Message message, Channel channel) {
+ try {
+ log.info("接收到的主题是:" + msg);
+ // 关闭当前连接
+
+ // 在收到RabbitMQ消息后,初始化或重新连接到MQTT broker
+ log.info("mqtt主题初始化开始");
+ initMqttConnection(msg);
+
+ //手动确认
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initMqttConnection(String mqttTopic) {
+ try {
+ if (mqttClient != null && mqttClient.isConnected()){
+ log.info("关闭之前的连接");
+ // 关闭之前的连接
+ mqttClient.disconnect();
+ mqttClient.close();
+ mqttClient = null; // 释放资源
+ }
+ if(mqttClient == null){
+ log.info("mqtt服务器初始化开始");
+ long startTime = System.currentTimeMillis();
+ mqttClient = new MqttClient(mqttProper.getBroker(),
+ UUID.randomUUID().toString(), new MemoryPersistence());
+ // 设置连接选项
+ MqttConnectOptions options = new MqttConnectOptions();
+ // 设置连接超时时间
+ options.setConnectionTimeout(60);
+ options.setKeepAliveInterval(60);
+ mqttClient.connect(options);
+ log.info("连接到到mqtt节点:{},主题是:{}: " , mqttProper.getBroker(),mqttTopic);
+ //回调
+ mqttClient.setCallback(mqttService);
+ mqttClient.subscribe(mqttTopic,0);
+ log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+// @Configuration
+// public class MqttConfigTest {
+// @Bean
+// public MqttMessageListener mqttMessageListener() {
+// return new MqttMessageListener();
+// }
+// }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 248a6dd..505329a 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -52,5 +52,21 @@ spring:
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略报错
missing-topics-fatal: false
+ #rabbitmq配置
+ #rabbitmq配置
+ rabbitmq:
+ host: 10.100.1.5
+ port: 5672
+ # 开启发送端消息抵达Broker确认
+ publisher-confirms: true
+ # 开启发送端消息抵达Queue确认
+ publisher-returns: true
+ # 只要消息抵达Queue,就会异步发送优先回调 returnfirm
+ template:
+ mandatory: true
+ listener:
+ simple:
+ # 手动 ack消息,不使用默认的消费端确认
+ acknowledge-mode: manual