master
fst1996 2023-12-03 20:15:16 +08:00
parent a457390ac9
commit 5e64b723f5
5 changed files with 170 additions and 50 deletions

24
pom.xml
View File

@ -38,26 +38,35 @@
<!-- kafka-->
<dependencies>
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -69,6 +78,11 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -2,10 +2,11 @@ package com.god;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class MqttKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(MqttKafkaApplication.class);
SpringApplication.run(MqttKafkaApplication.class, args);
}
}

View File

@ -1,44 +1,44 @@
package com.god.mqtt.config;
import com.god.mqtt.service.MqttService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
/**
* @author DongZl
* @description: Mqtt
* @Date 2023-11-24 02:06
*/
@Log4j2
@Configuration
public class MqttConfig {
@Bean
public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){
try {
log.info("mqtt服务器初始化开始");
long startTime = System.currentTimeMillis();
MqttClient client = new MqttClient(mqttProper.getBroker(),
UUID.randomUUID().toString(),
new MemoryPersistence());
MemoryPersistence memoryPersistence = new MemoryPersistence();
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
log.info("mqtt服务器初始化结束 耗时:[{}MS]", System.currentTimeMillis() - startTime);
client.connect(options);
client.setCallback(mqttService);
client.subscribe(mqttProper.getTopic(), 0);
return client;
}catch (Exception e) {
throw new RuntimeException(e);
}
}
}
//package com.god.mqtt.config;
//
//import com.god.mqtt.service.MqttService;
//import lombok.extern.log4j.Log4j2;
//import org.eclipse.paho.client.mqttv3.MqttClient;
//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
//import java.util.UUID;
//
///**
// * @author DongZl
// * @description: Mqtt配置
// * @Date 2023-11-24 下午 02:06
// */
//@Log4j2
//@Configuration
//public class MqttConfig {
//
// @Bean
// public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){
// try {
// log.info("mqtt服务器初始化开始");
// long startTime = System.currentTimeMillis();
// MqttClient client = new MqttClient(mqttProper.getBroker(),
// UUID.randomUUID().toString(),
// new MemoryPersistence());
// MemoryPersistence memoryPersistence = new MemoryPersistence();
// // 连接参数
// MqttConnectOptions options = new MqttConnectOptions();
// options.setConnectionTimeout(60);
// options.setKeepAliveInterval(60);
// log.info("mqtt服务器初始化结束 耗时:[{}MS]", System.currentTimeMillis() - startTime);
// client.connect(options);
// client.setCallback(mqttService);
// client.subscribe(mqttProper.getTopic(), 0);
// return client;
// }catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
//}

View File

@ -0,0 +1,89 @@
package com.god.mqtt.listen;
import com.god.mqtt.config.MqttProper;
import com.god.mqtt.service.MqttService;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
@Log4j2
public class MqttMessageListener {
private MqttClient mqttClient;
@Autowired
private MqttProper mqttProper;
@Autowired
MqttService mqttService;
/**
* RabbitMQ
* @param message
*/
@RabbitListener(queuesToDeclare = @Queue("TOPIC_INFORM"))
public void onMessage(String msg, Message message, Channel channel) {
try {
log.info("接收到的主题是:" + msg);
// 关闭当前连接
// 在收到RabbitMQ消息后初始化或重新连接到MQTT broker
log.info("mqtt主题初始化开始");
initMqttConnection(msg);
//手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void initMqttConnection(String mqttTopic) {
try {
if (mqttClient != null && mqttClient.isConnected()){
log.info("关闭之前的连接");
// 关闭之前的连接
mqttClient.disconnect();
mqttClient.close();
mqttClient = null; // 释放资源
}
if(mqttClient == null){
log.info("mqtt服务器初始化开始");
long startTime = System.currentTimeMillis();
mqttClient = new MqttClient(mqttProper.getBroker(),
UUID.randomUUID().toString(), new MemoryPersistence());
// 设置连接选项
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接超时时间
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
mqttClient.connect(options);
log.info("连接到到mqtt节点{},主题是:{}: " , mqttProper.getBroker(),mqttTopic);
//回调
mqttClient.setCallback(mqttService);
mqttClient.subscribe(mqttTopic,0);
log.info("mqtt服务器初始化结束 耗时:[{}MS]", System.currentTimeMillis() - startTime);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// @Configuration
// public class MqttConfigTest {
// @Bean
// public MqttMessageListener mqttMessageListener() {
// return new MqttMessageListener();
// }
// }
}

View File

@ -52,5 +52,21 @@ spring:
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略报错
missing-topics-fatal: false
#rabbitmq配置
#rabbitmq配置
rabbitmq:
host: 10.100.1.5
port: 5672
# 开启发送端消息抵达Broker确认
publisher-confirms: true
# 开启发送端消息抵达Queue确认
publisher-returns: true
# 只要消息抵达Queue就会异步发送优先回调 returnfirm
template:
mandatory: true
listener:
simple:
# 手动 ack消息不使用默认的消费端确认
acknowledge-mode: manual