diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnEventListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnEventListener.java new file mode 100644 index 0000000..28d8357 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnEventListener.java @@ -0,0 +1,48 @@ +package com.muyu.processing.listener; + +import com.alibaba.fastjson.JSONObject; +import com.muyu.domain.WarnRule; +import com.muyu.domain.resp.WarnRuleListResp; +import com.muyu.domain.resp.WarnRuleResp; +import com.muyu.processing.basic.EventCustom; +import com.muyu.processing.basic.EventListener; +import com.muyu.processing.utils.CacheUtil; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +/** + * 车辆预警监听 + * @Author: LiDongJia + * @Package: com.muyu.processing.listener + * @Project: cloud-server + * @name: WarnEventListener + * @Date: 2024/10/10 9:17 + * @Description: 车辆预警监听 + */ +public class WarnEventListener implements EventListener { + + @Resource + private CacheUtil cacheUtil; + + @Override + public void onEvent(EventCustom event) { + JSONObject data = event.getData(); + String vin = (String) data.get("VIN码"); + Map map = (Map) cacheUtil.get(vin); + if (map != null) { + WarnRuleResp warnRuleResp = (WarnRuleResp) map.get("warnRuleResp"); + List warnRuleList = warnRuleResp.getWarnRuleList(); + warnRuleList.forEach(warnRule -> { + String messageCode = warnRule.getMessageCode(); + }); + } + } + + @Override + public void onApplicationEvent(EventCustom event) { + + } +} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/FluxMqProperties.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/FluxMqProperties.java new file mode 100644 index 0000000..4802663 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/FluxMqProperties.java @@ -0,0 +1,30 @@ +package com.muyu.cloud.protocol.parsing; + +/** + * fluxMq配置 + * + * @Author: LiDongJia + * @Package: com.muyu.cloud.protocol.parsing + * @Project: cloud-server + * @name: FluxMqProperties + * @Date: 2024/10/10 14:19 + * @Description: fluxMq配置 + */ +public class FluxMqProperties { + + /** + * MQTT主题 + */ + public String TOPIC; + + /** + * MQTT Broker地址 + */ + public String BROKER; + + /** + * MQTT客户端ID + */ + public String CLIENT_ID; + +} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttCallbackMsg.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttCallbackMsg.java new file mode 100644 index 0000000..657c641 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttCallbackMsg.java @@ -0,0 +1,38 @@ +package com.muyu.cloud.protocol.parsing; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * mqtt回调器 + * @Author: LiDongJia + * @Package: com.muyu.cloud.protocol.parsing + * @Project: cloud-server + * @name: MqttCallbackMsg + * @Date: 2024/10/10 14:11 + * @Description: mqtt回调器 + */ +@Component +public class MqttCallbackMsg implements MqttCallback { + + @Autowired + private ParsingMessage parsingMessage; + + @Override + public void connectionLost(Throwable throwable) { + + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + parsingMessage.handleMqttMessage(mqttMessage); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } +} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttConfig.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttConfig.java new file mode 100644 index 0000000..2635e2c --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttConfig.java @@ -0,0 +1,30 @@ +package com.muyu.cloud.protocol.parsing; + +import com.alibaba.fastjson.JSONObject; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * MQTT监听器 + * @Author: LiDongJia + * @Package: com.muyu.cloud.protocol.parsing.config + * @Project: cloud-server + * @name: MqttConfig + * @Date: 2024/10/10 12:29 + * @Description: MQTT监听器 + */ +@Component +public class MqttConfig { + + @Autowired + private MqttPublishSample mqttPublishSample; + + // 监听MQ + @RabbitListener(queuesToDeclare = @Queue("aaa")) + public void msg(String fluxMqttConfig){ + FluxMqProperties fluxMqProperties = JSONObject.parseObject(fluxMqttConfig, FluxMqProperties.class); + mqttPublishSample.connectToMqttBroker(fluxMqProperties); + } +} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttPublishSample.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttPublishSample.java new file mode 100644 index 0000000..81a953e --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/MqttPublishSample.java @@ -0,0 +1,46 @@ +package com.muyu.cloud.protocol.parsing; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 创建mqtt实例 + * @Author: LiDongJia + * @Package: com.muyu.cloud.protocol.parsing + * @Project: cloud-server + * @name: MqttPublishSample + * @Date: 2024/10/10 14:33 + * @Description: 创建mqtt实例 + */ +@Log4j2 +@Component +public class MqttPublishSample { + + @Autowired + private MqttCallbackMsg mqttCallbackMsg; + + /** + * 连接MQTT Broker + */ + public void connectToMqttBroker(FluxMqProperties fluxMqProperties) { + try { + // 创建MqttClient实例,指定Broker地址、客户端ID以及持久化方式 + MqttClient mqttClient = new MqttClient(fluxMqProperties.BROKER, fluxMqProperties.CLIENT_ID, new MemoryPersistence()); + // 连接MQTT Broker + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + log.info("连接到协议: " + fluxMqProperties.BROKER); + mqttClient.connect(connOpts); + mqttClient.subscribe(fluxMqProperties.TOPIC, 0); + // 设置MQTT回调处理器 + mqttClient.setCallback(mqttCallbackMsg); + } catch (MqttException me) { + log.error("连接MQTT Broker失败: [{}]", me.getMessage()); + } + } +} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java index a357f6b..341f15d 100644 --- a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java @@ -23,12 +23,13 @@ import javax.annotation.Resource; import java.util.List; /** + * 解析MQTT消息 * @Author: LiDongJia - * @Package: com.muyu.cloud.protocol.parsing.service.impl + * @Package: com.muyu.cloud.protocol.parsing * @Project: 2112-car-cloud-server - * @name: ParsingServiceImpl + * @name: ParsingMessage * @Date: 2024/9/28 14:31 - * @Description: 协议解析实现层 + * @Description: 解析MQTT消息 */ @Log4j2 @Component @@ -46,78 +47,15 @@ public class ParsingMessage { // 报文模版缓存服务 @Autowired private AllMessageValueCacheService allMessageValueCacheService; - - // MQTT主题 - private static final String TOPIC = "vehicle"; - // MQTT Broker地址 - private static final String BROKER = "tcp://111.231.50.146:1883"; - // MQTT客户端ID - private static final String CLIENT_ID = "JavaSample"; - // MQTT客户端 - private MqttClient mqttClient; // kafka topic private static final String TIPSY = "tipsy"; - private final static String FORM_QUEUE = "queue_inform_sms"; - - /** - * 初始化MQTT连接 - */ - @RabbitListener(queuesToDeclare = @Queue(FORM_QUEUE)) - public void init() { - connectToMqttBroker(); - } - - /** - * 连接MQTT Broker - */ - private void connectToMqttBroker() { - try { - // 创建MqttClient实例,指定Broker地址、客户端ID以及持久化方式 - mqttClient = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence()); - // 连接MQTT Broker - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - log.info("连接到协议: " + BROKER); - mqttClient.connect(connOpts); - mqttClient.subscribe(TOPIC, 0); - // 设置MQTT回调处理器 - mqttClient.setCallback(new MqttCallbackHandler()); - } catch (MqttException me) { - log.error("连接MQTT Broker失败: [{}]", me.getMessage()); - } - } - - /** - * MQTT回调处理器 - */ - private class MqttCallbackHandler implements MqttCallback { - - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { - log.error("连接丢失: [{}]", throwable.getMessage()); - } - - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - handleMqttMessage(mqttMessage); - } - - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - } - /** * 处理MQTT消息 * * @param mqttMessage */ - private void handleMqttMessage(MqttMessage mqttMessage) { + public void handleMqttMessage(MqttMessage mqttMessage) { // 解析MQTT消息 String messageStr = new String(mqttMessage.getPayload()); log.info("接收到MQTT消息: " + messageStr);