From 15bcd34b07e5ebcb9d13be45823a0903f39450c7 Mon Sep 17 00:00:00 2001
From: LQS <2506203757@qq.com>
Date: Fri, 11 Oct 2024 21:56:22 +0800
Subject: [PATCH 1/2] =?UTF-8?q?feat():=E4=BF=AE=E5=A4=8DMQTT=E8=BF=9E?=
=?UTF-8?q?=E6=8E=A5=EF=BC=8C=E6=8E=A5=E6=94=B6rabbitMq=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=E6=B6=88=E8=B4=B9=EF=BC=8C=E6=8B=BF=E5=88=B0=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=E8=A7=A3=E6=9E=90=E6=8A=95=E9=80=92kafka?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
JavaSample-tcp1061513671883/.lck | 0
.../enterprise/domain/MqttProperties.java | 47 +++++
.../cloud-modules-protocol-analysis/pom.xml | 11 +
.../analysis/parsing/MQTT/ParsingMQTT.java | 189 ++++++++++--------
.../MQTT/service/MqttClientService.java | 110 ++++++++++
.../consumer/RabbitListenerComponent.java | 43 ++++
.../parsing/manager/MessageProcessor.java | 83 ++++++++
pom.xml | 7 +
8 files changed, 403 insertions(+), 87 deletions(-)
create mode 100644 JavaSample-tcp1061513671883/.lck
create mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java
create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java
create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java
create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java
diff --git a/JavaSample-tcp1061513671883/.lck b/JavaSample-tcp1061513671883/.lck
new file mode 100644
index 0000000..e69de29
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java
new file mode 100644
index 0000000..3d8ee27
--- /dev/null
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java
@@ -0,0 +1,47 @@
+package com.muyu.enterprise.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ *
+ * @Author:李庆帅
+ * @Package:com.muyu.enterprise.domain
+ * @Project:cloud-server
+ * @name:MqttProperties
+ * @Date:2024/10/10 20:01
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MqttProperties
+{
+ /**
+ * 节点
+ */
+ private String broker;
+ /**
+ * 主题
+ */
+ private String topic;
+ /**
+ * 用户名
+ */
+ private String userName;
+ /**
+ * 密码
+ */
+ private String password;
+ /**
+ * 客户端ID
+ */
+ private String clientId;
+ /**
+ * 上报级别
+ */
+ private int qos=0;
+
+}
diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml
index b46dbe9..d9a3bde 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml
+++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml
@@ -136,6 +136,17 @@
cloud-modules-enterprise-cache
+
+
+ com.muyu
+ cloud-common-rabbit
+
+
+
+
+
+
+
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
index 770eba1..b5d0638 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
@@ -1,5 +1,6 @@
package com.muyu.analysis.parsing.mqtt;
+import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result;
@@ -7,6 +8,7 @@ import com.muyu.common.mqtt.MQTTConnect;
import com.muyu.enterprise.cache.CarMessageValueCacheService;
import com.muyu.enterprise.cache.CarVehicleCacheService;
import com.muyu.enterprise.cache.CarVehicleTypeCacheService;
+import com.muyu.enterprise.domain.MqttProperties;
import com.muyu.enterprise.domain.car.Vehicle;
import com.muyu.enterprise.domain.car.VehicleType;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
@@ -20,6 +22,7 @@ import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@@ -64,109 +67,121 @@ public class ParsingMQTT {
@Resource
private CarMessageValueCacheService allMessageValueCacheService;
+ @Autowired
+ private MqttClientService mqttClientService;
/**
* 协议解析
*/
@PostConstruct
- public void mqttClient() {
+ public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) {
// String topic = "vehicle";
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
//// String clientId = "JavaSample";
- try {
- // 第三个参数为空,默认持久化策略
- MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(true);
- log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
- sampleClient.connect(connOpts);
- sampleClient.subscribe(MQTTConnect.TOPIC, 0);
- sampleClient.setCallback(new MqttCallback() {
- // 连接丢失
- @Override
- public void connectionLost(Throwable throwable) {
+ String payload = new String(mqttMessage.getPayload());
+ log.info("====:{}", payload);
- }
+// try {
+// // 第三个参数为空,默认持久化策略
+// MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
+// MqttConnectOptions connOpts = new MqttConnectOptions();
+// connOpts.setCleanSession(true);
+// log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
+// sampleClient.connect(connOpts);
+// //MQTTConnect.TOPIC
+// sampleClient.subscribe(MQTTConnect.TOPIC, 0);
+// sampleClient.setCallback(new MqttCallback() {
+// // 连接丢失
+// @Override
+// public void connectionLost(Throwable throwable) {
+//
+// }
+//
+// // 连接成功
+// @Override
+// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+// System.out.println(new String(mqttMessage.getPayload()));
+//// JSONObject entries = ParsingMQTT.protocolParsing(new String(mqttMessage.getPayload()));
+////
+//// ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
+//// entries.toString() );
+//// kafkaProducer.send(producerRecord);
+//// log.info("解析之后的数据:"+entries);
+//
+// }
+//
+//
+//
+// // 接收信息
+// @Override
+// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+//
+// }
+// });
+// } catch (MqttException me) {
+// log.info("reason " + me.getReasonCode());
+// log.info("msg " + me.getMessage());
+// log.info("loc " + me.getLocalizedMessage());
+// log.info("cause " + me.getCause());
+// log.info("excep " + me);
+// System.out.println("reason " + me.getReasonCode());
+// System.out.println("msg " + me.getMessage());
+// System.out.println("loc " + me.getLocalizedMessage());
+// System.out.println("cause " + me.getCause());
+// System.out.println("excep " + me);
+// me.printStackTrace();
+// }
- // 连接成功
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
- System.out.println(new String(mqttMessage.getPayload()));
- JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
- ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
- entries.toString() );
- kafkaProducer.send(producerRecord);
- log.info("解析之后的数据:"+entries);
- }
-
- /**
- * 协议解析
- * @param messageStr
- * @return
- */
- public JSONObject protocolParsing(String messageStr) {
- //根据空格切割数据
- String[] hexArray = messageStr.split(" ");
- StringBuilder result = new StringBuilder();
- //遍历十六进制数据转换为字符
- for (String hex : hexArray) {
- int decimal = Integer.parseInt(hex, 16);
- result.append((char) decimal);
- }
- //取出车辆VIN码
- String vehicleVin = result.substring(1, 18);
- log.info("车辆VIN码: " + vehicleVin);
- //根据车辆VIN码查询报文模板ID
- // 根据车辆VIN码查询车辆信息
- Vehicle vehicle = vehicleCacheService.get(vehicleVin);
- VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId()));
- Long templateId = vehicleType.getMessageTemplateId();
- List templateList = allMessageValueCacheService.get(String.valueOf(templateId));
- //判断报文模板列表不为空
- if (templateList.isEmpty()) {
- log.info("报文模版为空");
- throw new RuntimeException("报文模版为空");
- }
- //存储报文模版解析后的数据
- JSONObject jsonObject = new JSONObject();
- for (MessageValueListResp messageValue : templateList) {
- //起始位下标
- Integer startIndex = messageValue.getMessageStartIndex() - 1;
- //结束位下标
- Integer endIndex = messageValue.getMessageEndIndex();
- //根据报文模版截取数据
- String value = result.substring(startIndex, endIndex);
- //存入数据
- jsonObject.put(messageValue.getMessageLabel(), value);
- }
- return jsonObject;
- }
-
- // 接收信息
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-
- }
- });
- } catch (MqttException me) {
- log.info("reason " + me.getReasonCode());
- log.info("msg " + me.getMessage());
- log.info("loc " + me.getLocalizedMessage());
- log.info("cause " + me.getCause());
- log.info("excep " + me);
- System.out.println("reason " + me.getReasonCode());
- System.out.println("msg " + me.getMessage());
- System.out.println("loc " + me.getLocalizedMessage());
- System.out.println("cause " + me.getCause());
- System.out.println("excep " + me);
- me.printStackTrace();
- }
+ /**
+ * 协议解析
+ * @param messageStr
+ * @return
+ */
+// public JSONObject protocolParsing() {
+ //根据空格切割数据
+ String[] hexArray = messageStr.split(" ");
+ StringBuilder result = new StringBuilder();
+ //遍历十六进制数据转换为字符
+ for (String hex : hexArray) {
+ int decimal = Integer.parseInt(hex, 16);
+ result.append((char) decimal);
+ }
+ //取出车辆VIN码
+ String vehicleVin = result.substring(1, 18);
+ log.info("车辆VIN码: " + vehicleVin);
+ //根据车辆VIN码查询报文模板ID
+ // 根据车辆VIN码查询车辆信息
+ Vehicle vehicle = vehicleCacheService.get(vehicleVin);
+ VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId()));
+ Long templateId = vehicleType.getMessageTemplateId();
+ List templateList = allMessageValueCacheService.get(String.valueOf(templateId));
+ //判断报文模板列表不为空
+ if (templateList.isEmpty()) {
+ log.info("报文模版为空");
+ throw new RuntimeException("报文模版为空");
+ }
+ //存储报文模版解析后的数据
+ JSONObject jsonObject = new JSONObject();
+ for (MessageValueListResp messageValue : templateList) {
+ //起始位下标
+ Integer startIndex = messageValue.getMessageStartIndex() - 1;
+ //结束位下标
+ Integer endIndex = messageValue.getMessageEndIndex();
+ //根据报文模版截取数据
+ String value = result.substring(startIndex, endIndex);
+ //存入数据
+ jsonObject.put(messageValue.getMessageLabel(), value);
+ }
+ return jsonObject;
+// }
}
+
+
}
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java
new file mode 100644
index 0000000..02e1396
--- /dev/null
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java
@@ -0,0 +1,110 @@
+package com.muyu.analysis.parsing.mqtt.service;
+
+
+
+import cn.hutool.json.JSONObject;
+import com.muyu.analysis.parsing.mqtt.ParsingMQTT;
+import com.muyu.common.core.constant.KafkaConstants;
+import com.muyu.enterprise.domain.MqttProperties;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+/**
+ * @Author:李庆帅
+ * @Package:com.muyu.analysis.parsing.mqtt.service
+ * @Project:cloud-server
+ * @name:MqttClientService
+ * @Date:2024/10/10 20:24
+ */
+
+@Service
+@Slf4j
+public class MqttClientService {
+
+ @Autowired
+ private ParsingMQTT messageProcessor;
+
+ @Resource
+ private KafkaProducer kafkaProducer;
+
+ private final ExecutorService executorService = Executors.newCachedThreadPool();
+ private MqttClient sampleClient;
+
+
+ public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
+ executorService.submit(() -> {
+ try {
+ connectAndSubscribe(mqttProperties);
+ } catch (MqttException | IOException e) {
+ log.error("MQTT连接或订阅失败", e);
+ }
+ });
+ }
+
+ private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
+// if (sampleClient != null && sampleClient.isConnected()) {
+// log.info("MQTT客户端已经连接,跳过重新连接。");
+// return;
+// }
+
+ sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setUserName(mqttProperties.getUserName());
+ connOpts.setPassword(mqttProperties.getPassword().toCharArray());
+ connOpts.setCleanSession(true);
+
+ sampleClient.connect(connOpts);
+ sampleClient.subscribe(mqttProperties.getTopic(), 0);
+
+ sampleClient.setCallback(new MqttCallback() {
+ @Override
+ public void connectionLost(Throwable throwable) {
+ log.error("连接丢失:{}", throwable.getMessage());
+ }
+
+ @Override
+ public void messageArrived(String s, MqttMessage mqttMessage) {
+ executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())));
+ System.out.println(new String(mqttMessage.getPayload()));
+ JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()));
+// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload()));
+
+ ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
+ entries.toString() );
+ kafkaProducer.send(producerRecord);
+ log.info("解析之后的数据:"+entries);
+
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ log.info("消息发送完成:{}", iMqttDeliveryToken);
+ }
+ });
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ executorService.shutdown();
+ if (sampleClient != null && sampleClient.isConnected()) {
+ try {
+ sampleClient.disconnect();
+ } catch (MqttException e) {
+ log.error("MQTT客户端断开连接失败", e);
+ }
+ }
+ }
+
+
+}
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java
new file mode 100644
index 0000000..9c83f18
--- /dev/null
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java
@@ -0,0 +1,43 @@
+package com.muyu.analysis.parsing.consumer;
+
+
+import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
+import com.muyu.enterprise.domain.MqttProperties;
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+
+
+/**
+ * @Author:李庆帅
+ * @Package:com.muyu.analysis.parsing.consumer
+ * @Project:cloud-server
+ * @name:RabbitListenerComponent
+ * @Date:2024/10/10 20:24
+ */
+
+@Component
+@Slf4j
+public class RabbitListenerComponent {
+ @Autowired
+ private MqttClientService mqttClientService;
+
+ private static final String FORM_QUEUE = "GO_LINE";
+
+ @RabbitListener(queuesToDeclare = @Queue(value = FORM_QUEUE, durable = "true"))
+ public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
+ try {
+ mqttClientService.connectAndSubscribeAsync(mqttProperties);
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("处理RabbitMQ消息时发生错误", e);
+ }
+ }
+
+}
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java
new file mode 100644
index 0000000..e54698d
--- /dev/null
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java
@@ -0,0 +1,83 @@
+//package com.muyu.analysis.parsing.manager;
+//
+//
+//import com.alibaba.fastjson2.JSONObject;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.kafka.clients.producer.KafkaProducer;
+//import org.apache.kafka.clients.producer.ProducerRecord;
+//import org.eclipse.paho.client.mqttv3.MqttMessage;
+//import org.springframework.stereotype.Service;
+//
+//import javax.annotation.Resource;
+//import java.util.ArrayList;
+//import java.util.List;
+//
+//
+//
+///**
+// *
+// *
+// * @Author:李庆帅
+// * @Package:com.muyu.analysis.parsing.manager
+// * @Project:cloud-server
+// * @name:MessageProcessor
+// * @Date:2024/10/10 20:24
+// */
+//
+//@Slf4j
+//@Service
+//public class MessageProcessor {
+// private static final int ID = 1;
+// @Resource
+// private SysCarMessageServiceImpl sysCarMessageService;
+// @Resource
+// private KafkaProducer kafkaProducer;
+//
+//
+// public void processMessage(MqttMessage mqttMessage) {
+// String payload = new String(mqttMessage.getPayload());
+// log.info("====:{}", payload);
+//
+// List carMessages = sysCarMessageService.selectSysCarMessageLists(ID);
+// List kafKaDataList = new ArrayList<>();
+//
+// String[] test = payload.split(" ");
+// for (SysCarMessage carMessage : carMessages) {
+// int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
+// int end = Integer.parseInt(carMessage.getMessageEndIndex());
+//
+// StringBuilder hexBuilder = new StringBuilder();
+// for (int i = start; i < end; i++) {
+// hexBuilder.append(test[i]);
+// }
+//
+// String hex = hexBuilder.toString();
+// char[] result = new char[hex.length() / 2];
+// for (int x = 0; x < hex.length(); x += 2) {
+// int high = Character.digit(hex.charAt(x), 16);
+// int low = Character.digit(hex.charAt(x + 1), 16);
+// result[x / 2] = (char) ((high << 4) + low);
+// }
+//
+// String value = new String(result);
+// kafKaDataList.add(KafKaData.builder()
+// .key(carMessage.getMessageTypeCode())
+// .label(carMessage.getMessageTypeCode())
+// .value(value)
+// .type(carMessage.getMessageType())
+// .build());
+// }
+//
+// kafKaDataList.add(KafKaData.builder()
+// .key("firmCode")
+// .label("企业编码")
+// .value("firm01")
+// .type("String")
+// .build());
+// String jsonString = JSONObject.toJSONString(kafKaDataList);
+// ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
+// kafkaProducer.send(producerRecord);
+// log.info("kafka投产:{}", jsonString);
+// }
+//
+//}
diff --git a/pom.xml b/pom.xml
index 3e6c60d..6dd7f42 100644
--- a/pom.xml
+++ b/pom.xml
@@ -410,6 +410,13 @@
${muyu.version}
+
+
+ com.muyu
+ cloud-modules-vehicle-gateway
+ ${muyu.version}
+
+
From 1631a7cae8eeadb22097f00dd213a693cd5d021b Mon Sep 17 00:00:00 2001
From: LQS <2506203757@qq.com>
Date: Sat, 12 Oct 2024 11:09:14 +0800
Subject: [PATCH 2/2] =?UTF-8?q?feat():=E4=BF=AE=E5=A4=8DMQTT=E8=BF=9E?=
=?UTF-8?q?=E6=8E=A5=EF=BC=8C=E6=8E=A5=E6=94=B6rabbitMq=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=E6=B6=88=E8=B4=B9=EF=BC=8C=E6=8B=BF=E5=88=B0=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=E8=A7=A3=E6=9E=90=E6=8A=95=E9=80=92kafka?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../analysis/parsing/MQTT/service/MqttClientService.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java
index 02e1396..2113df5 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java
@@ -53,10 +53,10 @@ public class MqttClientService {
}
private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
-// if (sampleClient != null && sampleClient.isConnected()) {
-// log.info("MQTT客户端已经连接,跳过重新连接。");
-// return;
-// }
+ if (sampleClient != null && sampleClient.isConnected()) {
+ log.info("MQTT客户端已经连接,跳过重新连接。");
+ return;
+ }
sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
MqttConnectOptions connOpts = new MqttConnectOptions();