From cb68e9e60125392da72170a0f2470e0673b3bb33 Mon Sep 17 00:00:00 2001
From: chentaisen <14615430+chentaisen@user.noreply.gitee.com>
Date: Fri, 11 Oct 2024 21:38:26 +0800
Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E4=BF=AE=E5=A4=8D=E4=BB=A3?=
=?UTF-8?q?=E7=A0=81=20=20+=20=E5=90=88=E5=B9=B6=E7=BD=91=E5=85=B3?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../core/domain/properties/MqttPropertie.java | 4 +--
cloud-common/pom.xml | 1 +
.../cloud-modules-car-gateway/pom.xml | 17 +++++++++++-
.../Impl/CarOneClickOperationServiceImpl.java | 26 +++++++++----------
cloud-modules/cloud-modules-parsing/pom.xml | 13 +---------
.../consumer/RabbitListenerComponent.java | 7 ++---
.../mqtt/service/MqttClientService.java | 20 +++++++-------
.../src/main/resources/bootstrap.yml | 16 ++++++------
pom.xml | 8 ++++++
9 files changed, 63 insertions(+), 49 deletions(-)
rename cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/properties/MqttProperties.java => cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/domain/properties/MqttPropertie.java (89%)
diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/properties/MqttProperties.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/domain/properties/MqttPropertie.java
similarity index 89%
rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/properties/MqttProperties.java
rename to cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/domain/properties/MqttPropertie.java
index 7af69a7..167a5bd 100644
--- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/properties/MqttProperties.java
+++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/domain/properties/MqttPropertie.java
@@ -1,4 +1,4 @@
-package com.muyu.car.gateway.domain.properties;
+package com.muyu.common.core.domain.properties;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -17,7 +17,7 @@ import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class MqttProperties {
+public class MqttPropertie {
/**
* 节点
*/
diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml
index b053ac7..d80c63b 100644
--- a/cloud-common/pom.xml
+++ b/cloud-common/pom.xml
@@ -35,3 +35,4 @@
+
diff --git a/cloud-modules/cloud-modules-car-gateway/pom.xml b/cloud-modules/cloud-modules-car-gateway/pom.xml
index 814c945..be00efc 100644
--- a/cloud-modules/cloud-modules-car-gateway/pom.xml
+++ b/cloud-modules/cloud-modules-car-gateway/pom.xml
@@ -110,5 +110,20 @@
0.2.21
-
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ repackage
+
+
+
+
+
+
diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/Impl/CarOneClickOperationServiceImpl.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/Impl/CarOneClickOperationServiceImpl.java
index 8c279e5..5ba6dcf 100644
--- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/Impl/CarOneClickOperationServiceImpl.java
+++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/Impl/CarOneClickOperationServiceImpl.java
@@ -3,7 +3,7 @@ package com.muyu.car.gateway.service.Impl;
import com.muyu.car.gateway.domain.VehicleConnection;
import com.muyu.car.gateway.domain.VinIp;
import com.muyu.car.gateway.domain.model.MqttServerModel;
-import com.muyu.car.gateway.domain.properties.MqttProperties;
+import com.muyu.common.core.domain.properties.MqttPropertie;
import com.muyu.car.gateway.domain.req.VehicleConnectionReq;
import com.muyu.car.gateway.mapper.CarOneClickOperationMapper;
import com.muyu.car.gateway.service.CarOneClickOperationService;
@@ -74,15 +74,15 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
log.error("=============车辆:{}已经绑定过了", vehicleConnectionReq.getVehicleVin());
throw new RuntimeException("=============车辆已经绑定过了");
}
- MqttProperties mqttProperties = new MqttProperties();
+ MqttPropertie mqttPropertie = new MqttPropertie();
List vehicleVin = selectByVehicleVin(vehicleConnectionReq.getVehicleVin());
for (VehicleConnection connection : vehicleVin) {
- mqttProperties.setClientId(connection.getVehicleVin());
- mqttProperties.setUserName(connection.getUsername());
- mqttProperties.setPassword(connection.getPassword());
+ mqttPropertie.setClientId(connection.getVehicleVin());
+ mqttPropertie.setUserName(connection.getUsername());
+ mqttPropertie.setPassword(connection.getPassword());
}
- mqttProperties.setTopic("vehicle");
- mqttProperties.setQos(0);
+ mqttPropertie.setTopic("vehicle");
+ mqttPropertie.setQos(0);
//判断redis有没有count键
if (redisTemplate.hasKey("oneCount")) {
//取出count
@@ -101,10 +101,10 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString()));
//响应信息
log.info("车辆:{}", vehicleConnectionReq.getVehicleVin() + "绑定成功:{}", ipList);
- mqttProperties.setBroker("tcp://" + ipList + ":1883");
+ mqttPropertie.setBroker("tcp://" + ipList + ":1883");
// 使用交换机发送消息
- rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties);
- log.info("============================发送消息成功:{}", mqttProperties);
+ rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttPropertie);
+ log.info("============================发送消息成功:{}", mqttPropertie);
return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle"));
} else {
redisTemplate.opsForValue().set("oneCount", String.valueOf(0));
@@ -114,10 +114,10 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString()));
//响应信息
log.info("车辆:{}", vehicleConnectionReq.getVehicleVin(), "与:{}绑定成功", ipList);
- mqttProperties.setBroker("tcp://" + ipList + ":1883");
+ mqttPropertie.setBroker("tcp://" + ipList + ":1883");
// 使用交换机发送消息
- rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties);
- log.info("============================发送消息成功:{}", mqttProperties);
+ rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttPropertie);
+ log.info("============================发送消息成功:{}", mqttPropertie);
return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle"));
}
}
diff --git a/cloud-modules/cloud-modules-parsing/pom.xml b/cloud-modules/cloud-modules-parsing/pom.xml
index cd292b5..9ef9422 100644
--- a/cloud-modules/cloud-modules-parsing/pom.xml
+++ b/cloud-modules/cloud-modules-parsing/pom.xml
@@ -101,22 +101,11 @@
org.eclipse.paho.client.mqttv3
1.2.2
-
- com.muyu
- cloud-modules-car-gateway
- 3.6.3
- compile
-
+
io.netty
netty-codec-mqtt
-
- com.muyu
- cloud-modules-car-gateway
- 3.6.3
- compile
-
diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/RabbitListenerComponent.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/RabbitListenerComponent.java
index fd46e3e..1544152 100644
--- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/RabbitListenerComponent.java
+++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/RabbitListenerComponent.java
@@ -1,6 +1,7 @@
package com.muyu.parsing.consumer;
-import com.muyu.car.gateway.domain.properties.MqttProperties;
+
+import com.muyu.common.core.domain.properties.MqttPropertie;
import com.muyu.common.rabbit.constants.RabbitConstants;
import com.muyu.parsing.mqtt.service.MqttClientService;
import com.rabbitmq.client.Channel;
@@ -18,9 +19,9 @@ public class RabbitListenerComponent {
private MqttClientService mqttClientService;
@RabbitListener(queuesToDeclare = @Queue(value = RabbitConstants.FORM_QUEUE, durable = "true"))
- public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
+ public void downline(MqttPropertie mqttPropertie, Message message, Channel channel) {
try {
- mqttClientService.connectAndSubscribeAsync(mqttProperties);
+ mqttClientService.connectAndSubscribeAsync(mqttPropertie);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java
index 777b02c..fddeaa6 100644
--- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java
+++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java
@@ -1,7 +1,7 @@
package com.muyu.parsing.mqtt.service;
import com.alibaba.fastjson2.JSONObject;
-import com.muyu.car.gateway.domain.properties.MqttProperties;
+import com.muyu.common.core.domain.properties.MqttPropertie;
import com.muyu.parsing.manager.MessageProcessor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
@@ -24,17 +24,17 @@ public class MqttClientService {
private MqttClient sampleClient;
- public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
+ public void connectAndSubscribeAsync(MqttPropertie mqttPropertie) {
executorService.submit(() -> {
int inct = 0;
while (true) {
try {
- connectAndSubscribe(mqttProperties);
- log.info("MQTT客户端连接成功:[{}]", JSONObject.toJSONString(mqttProperties));
+ connectAndSubscribe(mqttPropertie);
+ log.info("MQTT客户端连接成功:[{}]", JSONObject.toJSONString(mqttPropertie));
break;
} catch (MqttException | IOException e) {
if (inct > 5){
- log.error("MQTT连接或订阅失败-{},已经尝试{}次:{}",e.getMessage(),inct,JSONObject.toJSONString(mqttProperties));
+ log.error("MQTT连接或订阅失败-{},已经尝试{}次:{}",e.getMessage(),inct,JSONObject.toJSONString(mqttPropertie));
break;
}
@@ -49,20 +49,20 @@ public class MqttClientService {
});
}
- private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
+ private void connectAndSubscribe(MqttPropertie mqttPropertie) throws MqttException, IOException {
// if (sampleClient != null && sampleClient.isConnected()) {
// log.info("MQTT客户端已经连接,跳过重新连接。");
// return;
// }
- sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
+ sampleClient = new MqttClient(mqttPropertie.getBroker(), mqttPropertie.getClientId());
MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setUserName(mqttProperties.getUserName());
- connOpts.setPassword(mqttProperties.getPassword().toCharArray());
+ connOpts.setUserName(mqttPropertie.getUserName());
+ connOpts.setPassword(mqttPropertie.getPassword().toCharArray());
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
- sampleClient.subscribe(mqttProperties.getTopic(), 0);
+ sampleClient.subscribe(mqttPropertie.getTopic(), 0);
sampleClient.setCallback(new MqttCallback() {
@Override
diff --git a/cloud-modules/cloud-modules-parsing/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-parsing/src/main/resources/bootstrap.yml
index 0a701b7..543bf10 100644
--- a/cloud-modules/cloud-modules-parsing/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-parsing/src/main/resources/bootstrap.yml
@@ -28,19 +28,19 @@ spring:
discovery:
# 服务注册地址
server-addr: ${nacos.addr}
- # # nacos用户名
- # username: ${nacos.user-name}
- # # nacos密码
- # password: ${nacos.password}
+ # nacos用户名
+ username: ${nacos.user-name}
+ # nacos密码
+ password: ${nacos.password}
# 命名空间
namespace: ${nacos.namespace}
config:
# 服务注册地址
server-addr: ${nacos.addr}
- # # nacos用户名
- # username: ${nacos.user-name}
- # # nacos密码
- # password: ${nacos.password}
+ # nacos用户名
+ username: ${nacos.user-name}
+ # nacos密码
+ password: ${nacos.password}
# 命名空间
namespace: ${nacos.namespace}
# 配置文件格式
diff --git a/pom.xml b/pom.xml
index b556173..9e1bb6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -310,6 +310,14 @@
cloud-modules-openbusiness-common
${muyu.version}
+
+
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.2
+