diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/MqttProperties.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/MqttProperties.java new file mode 100644 index 0000000..e96334a --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/MqttProperties.java @@ -0,0 +1,42 @@ +package com.muyu.common.rabbit; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @ Description:Mqtt的配置 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttProperties implements Serializable{ + /** + * 节点 + */ + private String broker; + /** + * 主题 + */ + private String topic; + /** + * 用户名 + */ + private String userName; + /** + * 密码 + */ + private String password; + /** + * 客户端id + */ + private String clientId; + /** + * 上报级别 + */ + private int qos = 0; +} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttConfig.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttConfig.java index 2635e2c..d29d082 100644 --- a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttConfig.java +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttConfig.java @@ -1,10 +1,15 @@ package com.muyu.cloud.protocol.parsing; -import com.alibaba.fastjson.JSONObject; +import com.muyu.common.rabbit.MqttProperties; +import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.amqp.core.Message; +import com.rabbitmq.client.Channel; + +import java.io.IOException; /** * MQTT监听器 @@ -15,6 +20,7 @@ import org.springframework.stereotype.Component; * @Date: 2024/10/10 12:29 * @Description: MQTT监听器 */ +@Log4j2 @Component public class MqttConfig { @@ -22,9 +28,14 @@ public class MqttConfig { private MqttPublishSample mqttPublishSample; // 监听MQ - @RabbitListener(queuesToDeclare = @Queue("aaa")) - public void msg(String fluxMqttConfig){ - FluxMqProperties fluxMqProperties = JSONObject.parseObject(fluxMqttConfig, FluxMqProperties.class); - mqttPublishSample.connectToMqttBroker(fluxMqProperties); + @RabbitListener(queuesToDeclare = @Queue(value = "queue_inform_sms", durable = "true")) + public void msg(MqttProperties mqttProperties, Message message, Channel channel) throws Exception { + try{ + mqttPublishSample.connectToMqttBroker(mqttProperties); + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } catch (IOException e) { + throw new RuntimeException(e); + } } + } diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttPublishSample.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttPublishSample.java index 81a953e..646b26c 100644 --- a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttPublishSample.java +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttPublishSample.java @@ -1,5 +1,6 @@ package com.muyu.cloud.protocol.parsing; +import com.muyu.common.rabbit.MqttProperties; import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; @@ -27,16 +28,18 @@ public class MqttPublishSample { /** * 连接MQTT Broker */ - public void connectToMqttBroker(FluxMqProperties fluxMqProperties) { + public void connectToMqttBroker(MqttProperties mqttProperties) { try { // 创建MqttClient实例,指定Broker地址、客户端ID以及持久化方式 - MqttClient mqttClient = new MqttClient(fluxMqProperties.BROKER, fluxMqProperties.CLIENT_ID, new MemoryPersistence()); + MqttClient mqttClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId(), new MemoryPersistence()); // 连接MQTT Broker MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); - log.info("连接到协议: " + fluxMqProperties.BROKER); + connOpts.setUserName(mqttProperties.getUserName()); + connOpts.setPassword(mqttProperties.getPassword().toCharArray()); + log.info("连接到协议: " + mqttProperties.getBroker()); mqttClient.connect(connOpts); - mqttClient.subscribe(fluxMqProperties.TOPIC, 0); + mqttClient.subscribe(mqttProperties.getTopic(), 0); // 设置MQTT回调处理器 mqttClient.setCallback(mqttCallbackMsg); } catch (MqttException me) { diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/properties/MqttProperties.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/properties/MqttProperties.java new file mode 100644 index 0000000..f604211 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/properties/MqttProperties.java @@ -0,0 +1,42 @@ +package com.muyu.cloud.protocol.parsing.properties; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @ Description:Mqtt的配置 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttProperties implements Serializable { + /** + * 节点 + */ + private String broker; + /** + * 主题 + */ + private String topic; + /** + * 用户名 + */ + private String userName; + /** + * 密码 + */ + private String password; + /** + * 客户端id + */ + private String clientId; + /** + * 上报级别 + */ + private int qos = 0; +} diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/properties/MqttProperties.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/properties/MqttProperties.java index ba6a788..7891db8 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/properties/MqttProperties.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/properties/MqttProperties.java @@ -14,7 +14,7 @@ import java.io.Serializable; @Builder @NoArgsConstructor @AllArgsConstructor -public class MqttProperties implements Serializable { +public class MqttProperties implements Serializable{ /** * 节点 */ diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java index 8694262..d61d0a8 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java @@ -3,7 +3,7 @@ package com.muyu.cloud.vehicle.gateway.service.impl; import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection; import com.muyu.cloud.vehicle.gateway.domain.VinIp; import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel; -import com.muyu.cloud.vehicle.gateway.domain.properties.MqttProperties; +import com.muyu.common.rabbit.MqttProperties; import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq; import com.muyu.cloud.vehicle.gateway.mapper.VehicleConnectionMapper; import com.muyu.cloud.vehicle.gateway.service.VehicleConnectionService; diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml index 8fed4a3..e606337 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml @@ -1,6 +1,6 @@ # Tomcat server: - port: 9710 + port: 9810 # nacos线上地址 # nacos线上地址 @@ -8,7 +8,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: warn + namespace: seven # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/pom.xml b/pom.xml index c3e95d0..4856f7a 100644 --- a/pom.xml +++ b/pom.xml @@ -294,7 +294,15 @@ cloud-modules-enterprise-remote ${muyu.version} + + + com.muyu + cloud-modules-vehicle-gateway + ${muyu.version} + + +