feat(): 车辆启动成功发送消息到rabbitmq当中,协议解析完成接收

dev
ywt 2024-10-11 21:52:30 +08:00
parent 90d688f8d2
commit 9a77d9dfbf
8 changed files with 119 additions and 13 deletions

View File

@ -0,0 +1,42 @@
package com.muyu.common.rabbit;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ DescriptionMqtt
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttProperties implements Serializable{
/**
*
*/
private String broker;
/**
*
*/
private String topic;
/**
*
*/
private String userName;
/**
*
*/
private String password;
/**
* id
*/
private String clientId;
/**
*
*/
private int qos = 0;
}

View File

@ -1,10 +1,15 @@
package com.muyu.cloud.protocol.parsing; package com.muyu.cloud.protocol.parsing;
import com.alibaba.fastjson.JSONObject; import com.muyu.common.rabbit.MqttProperties;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.Channel;
import java.io.IOException;
/** /**
* MQTT * MQTT
@ -15,6 +20,7 @@ import org.springframework.stereotype.Component;
* @Date: 2024/10/10 12:29 * @Date: 2024/10/10 12:29
* @Description: MQTT * @Description: MQTT
*/ */
@Log4j2
@Component @Component
public class MqttConfig { public class MqttConfig {
@ -22,9 +28,14 @@ public class MqttConfig {
private MqttPublishSample mqttPublishSample; private MqttPublishSample mqttPublishSample;
// 监听MQ // 监听MQ
@RabbitListener(queuesToDeclare = @Queue("aaa")) @RabbitListener(queuesToDeclare = @Queue(value = "queue_inform_sms", durable = "true"))
public void msg(String fluxMqttConfig){ public void msg(MqttProperties mqttProperties, Message message, Channel channel) throws Exception {
FluxMqProperties fluxMqProperties = JSONObject.parseObject(fluxMqttConfig, FluxMqProperties.class); try{
mqttPublishSample.connectToMqttBroker(fluxMqProperties); mqttPublishSample.connectToMqttBroker(mqttProperties);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -1,5 +1,6 @@
package com.muyu.cloud.protocol.parsing; package com.muyu.cloud.protocol.parsing;
import com.muyu.common.rabbit.MqttProperties;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@ -27,16 +28,18 @@ public class MqttPublishSample {
/** /**
* MQTT Broker * MQTT Broker
*/ */
public void connectToMqttBroker(FluxMqProperties fluxMqProperties) { public void connectToMqttBroker(MqttProperties mqttProperties) {
try { try {
// 创建MqttClient实例指定Broker地址、客户端ID以及持久化方式 // 创建MqttClient实例指定Broker地址、客户端ID以及持久化方式
MqttClient mqttClient = new MqttClient(fluxMqProperties.BROKER, fluxMqProperties.CLIENT_ID, new MemoryPersistence()); MqttClient mqttClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId(), new MemoryPersistence());
// 连接MQTT Broker // 连接MQTT Broker
MqttConnectOptions connOpts = new MqttConnectOptions(); MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true); connOpts.setCleanSession(true);
log.info("连接到协议: " + fluxMqProperties.BROKER); connOpts.setUserName(mqttProperties.getUserName());
connOpts.setPassword(mqttProperties.getPassword().toCharArray());
log.info("连接到协议: " + mqttProperties.getBroker());
mqttClient.connect(connOpts); mqttClient.connect(connOpts);
mqttClient.subscribe(fluxMqProperties.TOPIC, 0); mqttClient.subscribe(mqttProperties.getTopic(), 0);
// 设置MQTT回调处理器 // 设置MQTT回调处理器
mqttClient.setCallback(mqttCallbackMsg); mqttClient.setCallback(mqttCallbackMsg);
} catch (MqttException me) { } catch (MqttException me) {

View File

@ -0,0 +1,42 @@
package com.muyu.cloud.protocol.parsing.properties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ DescriptionMqtt
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttProperties implements Serializable {
/**
*
*/
private String broker;
/**
*
*/
private String topic;
/**
*
*/
private String userName;
/**
*
*/
private String password;
/**
* id
*/
private String clientId;
/**
*
*/
private int qos = 0;
}

View File

@ -14,7 +14,7 @@ import java.io.Serializable;
@Builder @Builder
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class MqttProperties implements Serializable { public class MqttProperties implements Serializable{
/** /**
* *
*/ */

View File

@ -3,7 +3,7 @@ package com.muyu.cloud.vehicle.gateway.service.impl;
import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection; import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection;
import com.muyu.cloud.vehicle.gateway.domain.VinIp; import com.muyu.cloud.vehicle.gateway.domain.VinIp;
import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel; import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel;
import com.muyu.cloud.vehicle.gateway.domain.properties.MqttProperties; import com.muyu.common.rabbit.MqttProperties;
import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq; import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq;
import com.muyu.cloud.vehicle.gateway.mapper.VehicleConnectionMapper; import com.muyu.cloud.vehicle.gateway.mapper.VehicleConnectionMapper;
import com.muyu.cloud.vehicle.gateway.service.VehicleConnectionService; import com.muyu.cloud.vehicle.gateway.service.VehicleConnectionService;

View File

@ -1,6 +1,6 @@
# Tomcat # Tomcat
server: server:
port: 9710 port: 9810
# nacos线上地址 # nacos线上地址
# nacos线上地址 # nacos线上地址
@ -8,7 +8,7 @@ nacos:
addr: 47.101.49.53:8848 addr: 47.101.49.53:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: warn namespace: seven
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring # Spring
spring: spring:

View File

@ -294,7 +294,15 @@
<artifactId>cloud-modules-enterprise-remote</artifactId> <artifactId>cloud-modules-enterprise-remote</artifactId>
<version>${muyu.version}</version> <version>${muyu.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-vehicle-gateway</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
<modules> <modules>