feat:修复代码 + 合并网关

dev
chentaisen 2024-10-11 21:38:26 +08:00
parent 800f804921
commit cb68e9e601
9 changed files with 63 additions and 49 deletions

View File

@ -1,4 +1,4 @@
package com.muyu.car.gateway.domain.properties; package com.muyu.common.core.domain.properties;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
@ -17,7 +17,7 @@ import lombok.NoArgsConstructor;
@Builder @Builder
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class MqttProperties { public class MqttPropertie {
/** /**
* *
*/ */

View File

@ -35,3 +35,4 @@
</description> </description>
</project> </project>

View File

@ -110,5 +110,20 @@
<version>0.2.21</version> <version>0.2.21</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -3,7 +3,7 @@ package com.muyu.car.gateway.service.Impl;
import com.muyu.car.gateway.domain.VehicleConnection; import com.muyu.car.gateway.domain.VehicleConnection;
import com.muyu.car.gateway.domain.VinIp; import com.muyu.car.gateway.domain.VinIp;
import com.muyu.car.gateway.domain.model.MqttServerModel; import com.muyu.car.gateway.domain.model.MqttServerModel;
import com.muyu.car.gateway.domain.properties.MqttProperties; import com.muyu.common.core.domain.properties.MqttPropertie;
import com.muyu.car.gateway.domain.req.VehicleConnectionReq; import com.muyu.car.gateway.domain.req.VehicleConnectionReq;
import com.muyu.car.gateway.mapper.CarOneClickOperationMapper; import com.muyu.car.gateway.mapper.CarOneClickOperationMapper;
import com.muyu.car.gateway.service.CarOneClickOperationService; import com.muyu.car.gateway.service.CarOneClickOperationService;
@ -74,15 +74,15 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
log.error("=============车辆:{}已经绑定过了", vehicleConnectionReq.getVehicleVin()); log.error("=============车辆:{}已经绑定过了", vehicleConnectionReq.getVehicleVin());
throw new RuntimeException("=============车辆已经绑定过了"); throw new RuntimeException("=============车辆已经绑定过了");
} }
MqttProperties mqttProperties = new MqttProperties(); MqttPropertie mqttPropertie = new MqttPropertie();
List<VehicleConnection> vehicleVin = selectByVehicleVin(vehicleConnectionReq.getVehicleVin()); List<VehicleConnection> vehicleVin = selectByVehicleVin(vehicleConnectionReq.getVehicleVin());
for (VehicleConnection connection : vehicleVin) { for (VehicleConnection connection : vehicleVin) {
mqttProperties.setClientId(connection.getVehicleVin()); mqttPropertie.setClientId(connection.getVehicleVin());
mqttProperties.setUserName(connection.getUsername()); mqttPropertie.setUserName(connection.getUsername());
mqttProperties.setPassword(connection.getPassword()); mqttPropertie.setPassword(connection.getPassword());
} }
mqttProperties.setTopic("vehicle"); mqttPropertie.setTopic("vehicle");
mqttProperties.setQos(0); mqttPropertie.setQos(0);
//判断redis有没有count键 //判断redis有没有count键
if (redisTemplate.hasKey("oneCount")) { if (redisTemplate.hasKey("oneCount")) {
//取出count //取出count
@ -101,10 +101,10 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString())); this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString()));
//响应信息 //响应信息
log.info("车辆:{}", vehicleConnectionReq.getVehicleVin() + "绑定成功:{}", ipList); log.info("车辆:{}", vehicleConnectionReq.getVehicleVin() + "绑定成功:{}", ipList);
mqttProperties.setBroker("tcp://" + ipList + ":1883"); mqttPropertie.setBroker("tcp://" + ipList + ":1883");
// 使用交换机发送消息 // 使用交换机发送消息
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties); rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttPropertie);
log.info("============================发送消息成功:{}", mqttProperties); log.info("============================发送消息成功:{}", mqttPropertie);
return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle")); return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle"));
} else { } else {
redisTemplate.opsForValue().set("oneCount", String.valueOf(0)); redisTemplate.opsForValue().set("oneCount", String.valueOf(0));
@ -114,10 +114,10 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString())); this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString()));
//响应信息 //响应信息
log.info("车辆:{}", vehicleConnectionReq.getVehicleVin(), "与:{}绑定成功", ipList); log.info("车辆:{}", vehicleConnectionReq.getVehicleVin(), "与:{}绑定成功", ipList);
mqttProperties.setBroker("tcp://" + ipList + ":1883"); mqttPropertie.setBroker("tcp://" + ipList + ":1883");
// 使用交换机发送消息 // 使用交换机发送消息
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties); rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttPropertie);
log.info("============================发送消息成功:{}", mqttProperties); log.info("============================发送消息成功:{}", mqttPropertie);
return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle")); return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle"));
} }
} }

View File

@ -101,22 +101,11 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version> <version>1.2.2</version>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-car-gateway</artifactId>
<version>3.6.3</version>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId> <artifactId>netty-codec-mqtt</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-car-gateway</artifactId>
<version>3.6.3</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -1,6 +1,7 @@
package com.muyu.parsing.consumer; package com.muyu.parsing.consumer;
import com.muyu.car.gateway.domain.properties.MqttProperties;
import com.muyu.common.core.domain.properties.MqttPropertie;
import com.muyu.common.rabbit.constants.RabbitConstants; import com.muyu.common.rabbit.constants.RabbitConstants;
import com.muyu.parsing.mqtt.service.MqttClientService; import com.muyu.parsing.mqtt.service.MqttClientService;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
@ -18,9 +19,9 @@ public class RabbitListenerComponent {
private MqttClientService mqttClientService; private MqttClientService mqttClientService;
@RabbitListener(queuesToDeclare = @Queue(value = RabbitConstants.FORM_QUEUE, durable = "true")) @RabbitListener(queuesToDeclare = @Queue(value = RabbitConstants.FORM_QUEUE, durable = "true"))
public void downline(MqttProperties mqttProperties, Message message, Channel channel) { public void downline(MqttPropertie mqttPropertie, Message message, Channel channel) {
try { try {
mqttClientService.connectAndSubscribeAsync(mqttProperties); mqttClientService.connectAndSubscribeAsync(mqttPropertie);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -1,7 +1,7 @@
package com.muyu.parsing.mqtt.service; package com.muyu.parsing.mqtt.service;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.car.gateway.domain.properties.MqttProperties; import com.muyu.common.core.domain.properties.MqttPropertie;
import com.muyu.parsing.manager.MessageProcessor; import com.muyu.parsing.manager.MessageProcessor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
@ -24,17 +24,17 @@ public class MqttClientService {
private MqttClient sampleClient; private MqttClient sampleClient;
public void connectAndSubscribeAsync(MqttProperties mqttProperties) { public void connectAndSubscribeAsync(MqttPropertie mqttPropertie) {
executorService.submit(() -> { executorService.submit(() -> {
int inct = 0; int inct = 0;
while (true) { while (true) {
try { try {
connectAndSubscribe(mqttProperties); connectAndSubscribe(mqttPropertie);
log.info("MQTT客户端连接成功[{}]", JSONObject.toJSONString(mqttProperties)); log.info("MQTT客户端连接成功[{}]", JSONObject.toJSONString(mqttPropertie));
break; break;
} catch (MqttException | IOException e) { } catch (MqttException | IOException e) {
if (inct > 5){ if (inct > 5){
log.error("MQTT连接或订阅失败-{},已经尝试{}次:{}",e.getMessage(),inct,JSONObject.toJSONString(mqttProperties)); log.error("MQTT连接或订阅失败-{},已经尝试{}次:{}",e.getMessage(),inct,JSONObject.toJSONString(mqttPropertie));
break; break;
} }
@ -49,20 +49,20 @@ public class MqttClientService {
}); });
} }
private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException { private void connectAndSubscribe(MqttPropertie mqttPropertie) throws MqttException, IOException {
// if (sampleClient != null && sampleClient.isConnected()) { // if (sampleClient != null && sampleClient.isConnected()) {
// log.info("MQTT客户端已经连接跳过重新连接。"); // log.info("MQTT客户端已经连接跳过重新连接。");
// return; // return;
// } // }
sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId()); sampleClient = new MqttClient(mqttPropertie.getBroker(), mqttPropertie.getClientId());
MqttConnectOptions connOpts = new MqttConnectOptions(); MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(mqttProperties.getUserName()); connOpts.setUserName(mqttPropertie.getUserName());
connOpts.setPassword(mqttProperties.getPassword().toCharArray()); connOpts.setPassword(mqttPropertie.getPassword().toCharArray());
connOpts.setCleanSession(true); connOpts.setCleanSession(true);
sampleClient.connect(connOpts); sampleClient.connect(connOpts);
sampleClient.subscribe(mqttProperties.getTopic(), 0); sampleClient.subscribe(mqttPropertie.getTopic(), 0);
sampleClient.setCallback(new MqttCallback() { sampleClient.setCallback(new MqttCallback() {
@Override @Override

View File

@ -28,19 +28,19 @@ spring:
discovery: discovery:
# 服务注册地址 # 服务注册地址
server-addr: ${nacos.addr} server-addr: ${nacos.addr}
# # nacos用户名 # nacos用户名
# username: ${nacos.user-name} username: ${nacos.user-name}
# # nacos密码 # nacos密码
# password: ${nacos.password} password: ${nacos.password}
# 命名空间 # 命名空间
namespace: ${nacos.namespace} namespace: ${nacos.namespace}
config: config:
# 服务注册地址 # 服务注册地址
server-addr: ${nacos.addr} server-addr: ${nacos.addr}
# # nacos用户名 # nacos用户名
# username: ${nacos.user-name} username: ${nacos.user-name}
# # nacos密码 # nacos密码
# password: ${nacos.password} password: ${nacos.password}
# 命名空间 # 命名空间
namespace: ${nacos.namespace} namespace: ${nacos.namespace}
# 配置文件格式 # 配置文件格式

View File

@ -310,6 +310,14 @@
<artifactId>cloud-modules-openbusiness-common</artifactId> <artifactId>cloud-modules-openbusiness-common</artifactId>
<version>${muyu.version}</version> <version>${muyu.version}</version>
</dependency> </dependency>
<!-- mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>