From 5e64b723f5f921d72bea55e78d010433da64e805 Mon Sep 17 00:00:00 2001 From: fst1996 <2411194573@qq.com> Date: Sun, 3 Dec 2023 20:15:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 24 +++-- .../java/com/god/MqttKafkaApplication.java | 3 +- .../java/com/god/mqtt/config/MqttConfig.java | 88 +++++++++--------- .../god/mqtt/listen/MqttMessageListener.java | 89 +++++++++++++++++++ src/main/resources/application.yml | 16 ++++ 5 files changed, 170 insertions(+), 50 deletions(-) create mode 100644 src/main/java/com/god/mqtt/listen/MqttMessageListener.java diff --git a/pom.xml b/pom.xml index 4230ae8..4cbe4f3 100644 --- a/pom.xml +++ b/pom.xml @@ -38,26 +38,35 @@ + + + org.springframework.boot + spring-boot-starter-amqp + + org.apache.kafka kafka-clients 3.3.1 + + + org.springframework.kafka + spring-kafka + + org.springframework.boot spring-boot-starter-web - - org.springframework.kafka - spring-kafka - 2.8.11 - + com.alibaba fastjson 1.2.83 + org.projectlombok lombok @@ -69,6 +78,11 @@ org.eclipse.paho.client.mqttv3 1.2.5 + + + org.springframework.boot + spring-boot-starter-amqp + diff --git a/src/main/java/com/god/MqttKafkaApplication.java b/src/main/java/com/god/MqttKafkaApplication.java index 6c90b3f..1d6757d 100644 --- a/src/main/java/com/god/MqttKafkaApplication.java +++ b/src/main/java/com/god/MqttKafkaApplication.java @@ -2,10 +2,11 @@ package com.god; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class MqttKafkaApplication { public static void main(String[] args) { - SpringApplication.run(MqttKafkaApplication.class); + SpringApplication.run(MqttKafkaApplication.class, args); } } diff --git a/src/main/java/com/god/mqtt/config/MqttConfig.java b/src/main/java/com/god/mqtt/config/MqttConfig.java index d959727..76763e0 100644 --- a/src/main/java/com/god/mqtt/config/MqttConfig.java +++ b/src/main/java/com/god/mqtt/config/MqttConfig.java @@ -1,44 +1,44 @@ -package com.god.mqtt.config; - -import com.god.mqtt.service.MqttService; -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.UUID; - -/** - * @author DongZl - * @description: Mqtt配置 - * @Date 2023-11-24 下午 02:06 - */ -@Log4j2 -@Configuration -public class MqttConfig { - - @Bean - public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){ - try { - log.info("mqtt服务器初始化开始"); - long startTime = System.currentTimeMillis(); - MqttClient client = new MqttClient(mqttProper.getBroker(), - UUID.randomUUID().toString(), - new MemoryPersistence()); - MemoryPersistence memoryPersistence = new MemoryPersistence(); - // 连接参数 - MqttConnectOptions options = new MqttConnectOptions(); - options.setConnectionTimeout(60); - options.setKeepAliveInterval(60); - log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime); - client.connect(options); - client.setCallback(mqttService); - client.subscribe(mqttProper.getTopic(), 0); - return client; - }catch (Exception e) { - throw new RuntimeException(e); - } - } -} +//package com.god.mqtt.config; +// +//import com.god.mqtt.service.MqttService; +//import lombok.extern.log4j.Log4j2; +//import org.eclipse.paho.client.mqttv3.MqttClient; +//import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +// +//import java.util.UUID; +// +///** +// * @author DongZl +// * @description: Mqtt配置 +// * @Date 2023-11-24 下午 02:06 +// */ +//@Log4j2 +//@Configuration +//public class MqttConfig { +// +// @Bean +// public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){ +// try { +// log.info("mqtt服务器初始化开始"); +// long startTime = System.currentTimeMillis(); +// MqttClient client = new MqttClient(mqttProper.getBroker(), +// UUID.randomUUID().toString(), +// new MemoryPersistence()); +// MemoryPersistence memoryPersistence = new MemoryPersistence(); +// // 连接参数 +// MqttConnectOptions options = new MqttConnectOptions(); +// options.setConnectionTimeout(60); +// options.setKeepAliveInterval(60); +// log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime); +// client.connect(options); +// client.setCallback(mqttService); +// client.subscribe(mqttProper.getTopic(), 0); +// return client; +// }catch (Exception e) { +// throw new RuntimeException(e); +// } +// } +//} diff --git a/src/main/java/com/god/mqtt/listen/MqttMessageListener.java b/src/main/java/com/god/mqtt/listen/MqttMessageListener.java new file mode 100644 index 0000000..5323161 --- /dev/null +++ b/src/main/java/com/god/mqtt/listen/MqttMessageListener.java @@ -0,0 +1,89 @@ +package com.god.mqtt.listen; + +import com.god.mqtt.config.MqttProper; +import com.god.mqtt.service.MqttService; +import com.rabbitmq.client.Channel; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Component +@Log4j2 +public class MqttMessageListener { + + private MqttClient mqttClient; + + @Autowired + private MqttProper mqttProper; + + @Autowired + MqttService mqttService; + + /** + * 处理RabbitMQ消息 + * @param message + */ + @RabbitListener(queuesToDeclare = @Queue("TOPIC_INFORM")) + public void onMessage(String msg, Message message, Channel channel) { + try { + log.info("接收到的主题是:" + msg); + // 关闭当前连接 + + // 在收到RabbitMQ消息后,初始化或重新连接到MQTT broker + log.info("mqtt主题初始化开始"); + initMqttConnection(msg); + + //手动确认 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void initMqttConnection(String mqttTopic) { + try { + if (mqttClient != null && mqttClient.isConnected()){ + log.info("关闭之前的连接"); + // 关闭之前的连接 + mqttClient.disconnect(); + mqttClient.close(); + mqttClient = null; // 释放资源 + } + if(mqttClient == null){ + log.info("mqtt服务器初始化开始"); + long startTime = System.currentTimeMillis(); + mqttClient = new MqttClient(mqttProper.getBroker(), + UUID.randomUUID().toString(), new MemoryPersistence()); + // 设置连接选项 + MqttConnectOptions options = new MqttConnectOptions(); + // 设置连接超时时间 + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + mqttClient.connect(options); + log.info("连接到到mqtt节点:{},主题是:{}: " , mqttProper.getBroker(),mqttTopic); + //回调 + mqttClient.setCallback(mqttService); + mqttClient.subscribe(mqttTopic,0); + log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +// @Configuration +// public class MqttConfigTest { +// @Bean +// public MqttMessageListener mqttMessageListener() { +// return new MqttMessageListener(); +// } +// } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 248a6dd..505329a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -52,5 +52,21 @@ spring: ack-mode: manual_immediate # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略报错 missing-topics-fatal: false + #rabbitmq配置 + #rabbitmq配置 + rabbitmq: + host: 10.100.1.5 + port: 5672 + # 开启发送端消息抵达Broker确认 + publisher-confirms: true + # 开启发送端消息抵达Queue确认 + publisher-returns: true + # 只要消息抵达Queue,就会异步发送优先回调 returnfirm + template: + mandatory: true + listener: + simple: + # 手动 ack消息,不使用默认的消费端确认 + acknowledge-mode: manual