fix():修复mqtt多个监听

dev.event.processing
李东佳 2024-10-10 15:56:01 +08:00
parent d9e2c2e357
commit 2c330529b1
6 changed files with 197 additions and 67 deletions

View File

@ -0,0 +1,48 @@
package com.muyu.processing.listener;
import com.alibaba.fastjson.JSONObject;
import com.muyu.domain.WarnRule;
import com.muyu.domain.resp.WarnRuleListResp;
import com.muyu.domain.resp.WarnRuleResp;
import com.muyu.processing.basic.EventCustom;
import com.muyu.processing.basic.EventListener;
import com.muyu.processing.utils.CacheUtil;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
/**
*
* @Author: LiDongJia
* @Package: com.muyu.processing.listener
* @Project: cloud-server
* @name: WarnEventListener
* @Date: 2024/10/10 9:17
* @Description:
*/
public class WarnEventListener implements EventListener {
@Resource
private CacheUtil cacheUtil;
@Override
public void onEvent(EventCustom event) {
JSONObject data = event.getData();
String vin = (String) data.get("VIN码");
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
if (map != null) {
WarnRuleResp warnRuleResp = (WarnRuleResp) map.get("warnRuleResp");
List<WarnRuleListResp> warnRuleList = warnRuleResp.getWarnRuleList();
warnRuleList.forEach(warnRule -> {
String messageCode = warnRule.getMessageCode();
});
}
}
@Override
public void onApplicationEvent(EventCustom event) {
}
}

View File

@ -0,0 +1,30 @@
package com.muyu.cloud.protocol.parsing;
/**
* fluxMq
*
* @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing
* @Project: cloud-server
* @name: FluxMqProperties
* @Date: 2024/10/10 14:19
* @Description: fluxMq
*/
public class FluxMqProperties {
/**
* MQTT
*/
public String TOPIC;
/**
* MQTT Broker
*/
public String BROKER;
/**
* MQTTID
*/
public String CLIENT_ID;
}

View File

@ -0,0 +1,38 @@
package com.muyu.cloud.protocol.parsing;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* mqtt
* @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing
* @Project: cloud-server
* @name: MqttCallbackMsg
* @Date: 2024/10/10 14:11
* @Description: mqtt
*/
@Component
public class MqttCallbackMsg implements MqttCallback {
@Autowired
private ParsingMessage parsingMessage;
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
parsingMessage.handleMqttMessage(mqttMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}

View File

@ -0,0 +1,30 @@
package com.muyu.cloud.protocol.parsing;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* MQTT
* @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing.config
* @Project: cloud-server
* @name: MqttConfig
* @Date: 2024/10/10 12:29
* @Description: MQTT
*/
@Component
public class MqttConfig {
@Autowired
private MqttPublishSample mqttPublishSample;
// 监听MQ
@RabbitListener(queuesToDeclare = @Queue("aaa"))
public void msg(String fluxMqttConfig){
FluxMqProperties fluxMqProperties = JSONObject.parseObject(fluxMqttConfig, FluxMqProperties.class);
mqttPublishSample.connectToMqttBroker(fluxMqProperties);
}
}

View File

@ -0,0 +1,46 @@
package com.muyu.cloud.protocol.parsing;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* mqtt
* @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing
* @Project: cloud-server
* @name: MqttPublishSample
* @Date: 2024/10/10 14:33
* @Description: mqtt
*/
@Log4j2
@Component
public class MqttPublishSample {
@Autowired
private MqttCallbackMsg mqttCallbackMsg;
/**
* MQTT Broker
*/
public void connectToMqttBroker(FluxMqProperties fluxMqProperties) {
try {
// 创建MqttClient实例指定Broker地址、客户端ID以及持久化方式
MqttClient mqttClient = new MqttClient(fluxMqProperties.BROKER, fluxMqProperties.CLIENT_ID, new MemoryPersistence());
// 连接MQTT Broker
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
log.info("连接到协议: " + fluxMqProperties.BROKER);
mqttClient.connect(connOpts);
mqttClient.subscribe(fluxMqProperties.TOPIC, 0);
// 设置MQTT回调处理器
mqttClient.setCallback(mqttCallbackMsg);
} catch (MqttException me) {
log.error("连接MQTT Broker失败: [{}]", me.getMessage());
}
}
}

View File

@ -23,12 +23,13 @@ import javax.annotation.Resource;
import java.util.List; import java.util.List;
/** /**
* MQTT
* @Author: LiDongJia * @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing.service.impl * @Package: com.muyu.cloud.protocol.parsing
* @Project: 2112-car-cloud-server * @Project: 2112-car-cloud-server
* @name: ParsingServiceImpl * @name: ParsingMessage
* @Date: 2024/9/28 14:31 * @Date: 2024/9/28 14:31
* @Description: * @Description: MQTT
*/ */
@Log4j2 @Log4j2
@Component @Component
@ -46,78 +47,15 @@ public class ParsingMessage {
// 报文模版缓存服务 // 报文模版缓存服务
@Autowired @Autowired
private AllMessageValueCacheService allMessageValueCacheService; private AllMessageValueCacheService allMessageValueCacheService;
// MQTT主题
private static final String TOPIC = "vehicle";
// MQTT Broker地址
private static final String BROKER = "tcp://111.231.50.146:1883";
// MQTT客户端ID
private static final String CLIENT_ID = "JavaSample";
// MQTT客户端
private MqttClient mqttClient;
// kafka topic // kafka topic
private static final String TIPSY = "tipsy"; private static final String TIPSY = "tipsy";
private final static String FORM_QUEUE = "queue_inform_sms";
/**
* MQTT
*/
@RabbitListener(queuesToDeclare = @Queue(FORM_QUEUE))
public void init() {
connectToMqttBroker();
}
/**
* MQTT Broker
*/
private void connectToMqttBroker() {
try {
// 创建MqttClient实例指定Broker地址、客户端ID以及持久化方式
mqttClient = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
// 连接MQTT Broker
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
log.info("连接到协议: " + BROKER);
mqttClient.connect(connOpts);
mqttClient.subscribe(TOPIC, 0);
// 设置MQTT回调处理器
mqttClient.setCallback(new MqttCallbackHandler());
} catch (MqttException me) {
log.error("连接MQTT Broker失败: [{}]", me.getMessage());
}
}
/**
* MQTT
*/
private class MqttCallbackHandler implements MqttCallback {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
log.error("连接丢失: [{}]", throwable.getMessage());
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
handleMqttMessage(mqttMessage);
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
/** /**
* MQTT * MQTT
* *
* @param mqttMessage * @param mqttMessage
*/ */
private void handleMqttMessage(MqttMessage mqttMessage) { public void handleMqttMessage(MqttMessage mqttMessage) {
// 解析MQTT消息 // 解析MQTT消息
String messageStr = new String(mqttMessage.getPayload()); String messageStr = new String(mqttMessage.getPayload());
log.info("接收到MQTT消息: " + messageStr); log.info("接收到MQTT消息: " + messageStr);