feat():修复MQTT连接,接收rabbitMq数据消费,拿到数据解析投递kafka

dev.analysis
LQS 2024-10-14 11:13:09 +08:00
parent 1631a7cae8
commit 16d8af0a4d
14 changed files with 325 additions and 222 deletions

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
# Spring
spring:
application:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
# Spring
spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
spring:
application:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
spring:
application:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
# Spring
spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
# Spring
spring:

View File

@ -136,11 +136,11 @@
<artifactId>cloud-modules-enterprise-cache</artifactId>
</dependency>
<!-- rabbit消息队列模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
<!-- &lt;!&ndash; rabbit消息队列模块 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.muyu</groupId>-->
<!-- <artifactId>cloud-common-rabbit</artifactId>-->
<!-- </dependency>-->
<!-- &lt;!&ndash; gateway系统模块 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.muyu</groupId>-->

View File

@ -1,6 +1,6 @@
package com.muyu.analysis.parsing.mqtt;
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
//import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result;
@ -67,19 +67,21 @@ public class ParsingMQTT {
@Resource
private CarMessageValueCacheService allMessageValueCacheService;
@Autowired
private MqttClientService mqttClientService;
/**
*
*/
@PostConstruct
public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) {
// String topic = "vehicle";
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
//// String clientId = "JavaSample";
// @Autowired
// private MqttClientService mqttClientService;
String payload = new String(mqttMessage.getPayload());
log.info("====:{}", payload);
// /**
// * 协议解析
// */
// @PostConstruct
// public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) {
//// String topic = "vehicle";
////// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
////// String clientId = "JavaSample";
//
// String payload = new String(mqttMessage.getPayload());
// log.info("====:{}", payload);
// try {
// // 第三个参数为空,默认持久化策略
@ -134,12 +136,93 @@ public class ParsingMQTT {
// /**
// * 协议解析
// * @param messageStr
// * @return
// */
// public JSONObject protocolParsing() {
//根据空格切割数据
// String[] hexArray = messageStr.split(" ");
// StringBuilder result = new StringBuilder();
// //遍历十六进制数据转换为字符
// for (String hex : hexArray) {
// int decimal = Integer.parseInt(hex, 16);
// result.append((char) decimal);
// }
// //取出车辆VIN码
// String vehicleVin = result.substring(1, 18);
// log.info("车辆VIN码: " + vehicleVin);
// //根据车辆VIN码查询报文模板ID
// // 根据车辆VIN码查询车辆信息
// Vehicle vehicle = vehicleCacheService.get(vehicleVin);
// VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId()));
// Long templateId = vehicleType.getMessageTemplateId();
// List<MessageValueListResp> templateList = allMessageValueCacheService.get(String.valueOf(templateId));
// //判断报文模板列表不为空
// if (templateList.isEmpty()) {
// log.info("报文模版为空");
// throw new RuntimeException("报文模版为空");
// }
// //存储报文模版解析后的数据
// JSONObject jsonObject = new JSONObject();
// for (MessageValueListResp messageValue : templateList) {
// //起始位下标
// Integer startIndex = messageValue.getMessageStartIndex() - 1;
// //结束位下标
// Integer endIndex = messageValue.getMessageEndIndex();
// //根据报文模版截取数据
// String value = result.substring(startIndex, endIndex);
// //存入数据
// jsonObject.put(messageValue.getMessageLabel(), value);
// }
// return jsonObject;
//// }
// }
/**
*
*/
@PostConstruct
public void mqttClient() {
// String topic = "vehicle";
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
//// String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
sampleClient.connect(connOpts);
sampleClient.subscribe(MQTTConnect.TOPIC, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
entries.toString() );
kafkaProducer.send(producerRecord);
log.info("解析之后的数据:"+entries);
}
/**
*
* @param messageStr
* @return
*/
// public JSONObject protocolParsing() {
public JSONObject protocolParsing(String messageStr) {
//根据空格切割数据
String[] hexArray = messageStr.split(" ");
StringBuilder result = new StringBuilder();
@ -175,7 +258,27 @@ public class ParsingMQTT {
jsonObject.put(messageValue.getMessageLabel(), value);
}
return jsonObject;
// }
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch (MqttException me) {
log.info("reason " + me.getReasonCode());
log.info("msg " + me.getMessage());
log.info("loc " + me.getLocalizedMessage());
log.info("cause " + me.getCause());
log.info("excep " + me);
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
@ -184,4 +287,5 @@ public class ParsingMQTT {
}

View File

@ -1,110 +1,109 @@
package com.muyu.analysis.parsing.mqtt.service;
import cn.hutool.json.JSONObject;
import com.muyu.analysis.parsing.mqtt.ParsingMQTT;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.enterprise.domain.MqttProperties;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author
* @Packagecom.muyu.analysis.parsing.mqtt.service
* @Projectcloud-server
* @nameMqttClientService
* @Date2024/10/10 20:24
*/
@Service
@Slf4j
public class MqttClientService {
@Autowired
private ParsingMQTT messageProcessor;
@Resource
private KafkaProducer<String, String> kafkaProducer;
private final ExecutorService executorService = Executors.newCachedThreadPool();
private MqttClient sampleClient;
public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
executorService.submit(() -> {
try {
connectAndSubscribe(mqttProperties);
} catch (MqttException | IOException e) {
log.error("MQTT连接或订阅失败", e);
}
});
}
private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
if (sampleClient != null && sampleClient.isConnected()) {
log.info("MQTT客户端已经连接跳过重新连接。");
return;
}
sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(mqttProperties.getUserName());
connOpts.setPassword(mqttProperties.getPassword().toCharArray());
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
sampleClient.subscribe(mqttProperties.getTopic(), 0);
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
log.error("连接丢失:{}", throwable.getMessage());
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())));
System.out.println(new String(mqttMessage.getPayload()));
JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()));
// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload()));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
entries.toString() );
kafkaProducer.send(producerRecord);
log.info("解析之后的数据:"+entries);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("消息发送完成:{}", iMqttDeliveryToken);
}
});
}
@PreDestroy
public void shutdown() {
executorService.shutdown();
if (sampleClient != null && sampleClient.isConnected()) {
try {
sampleClient.disconnect();
} catch (MqttException e) {
log.error("MQTT客户端断开连接失败", e);
}
}
}
}
//package com.muyu.analysis.parsing.mqtt.service;
//
//
//
//import cn.hutool.json.JSONObject;
//import com.muyu.common.core.constant.KafkaConstants;
//import com.muyu.enterprise.domain.MqttProperties;
//import jakarta.annotation.Resource;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.kafka.clients.producer.KafkaProducer;
//import org.apache.kafka.clients.producer.ProducerRecord;
//import org.eclipse.paho.client.mqttv3.*;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.PreDestroy;
//import java.io.IOException;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
//
//
///**
// * @Author李庆帅
// * @Packagecom.muyu.analysis.parsing.mqtt.service
// * @Projectcloud-server
// * @nameMqttClientService
// * @Date2024/10/10 20:24
// */
//
//@Service
//@Slf4j
//public class MqttClientService {
//
// @Autowired
// private ParsingMQTT messageProcessor;
//
// @Resource
// private KafkaProducer<String, String> kafkaProducer;
//
// private final ExecutorService executorService = Executors.newCachedThreadPool();
// private MqttClient sampleClient;
//
//
// public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
// executorService.submit(() -> {
// try {
// connectAndSubscribe(mqttProperties);
// } catch (MqttException | IOException e) {
// log.error("MQTT连接或订阅失败", e);
// }
// });
// }
//
// private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
// if (sampleClient != null && sampleClient.isConnected()) {
// log.info("MQTT客户端已经连接跳过重新连接。");
// return;
// }
//
// sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
// MqttConnectOptions connOpts = new MqttConnectOptions();
// connOpts.setUserName(mqttProperties.getUserName());
// connOpts.setPassword(mqttProperties.getPassword().toCharArray());
// connOpts.setCleanSession(true);
//
// sampleClient.connect(connOpts);
// sampleClient.subscribe(mqttProperties.getTopic(), 0);
//
// sampleClient.setCallback(new MqttCallback() {
// @Override
// public void connectionLost(Throwable throwable) {
// log.error("连接丢失:{}", throwable.getMessage());
// }
//
// @Override
// public void messageArrived(String s, MqttMessage mqttMessage) {
// executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())));
// System.out.println(new String(mqttMessage.getPayload()));
// JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()));
//// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload()));
//
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
// entries.toString() );
// kafkaProducer.send(producerRecord);
// log.info("解析之后的数据:"+entries);
//
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// log.info("消息发送完成:{}", iMqttDeliveryToken);
// }
// });
// }
//
// @PreDestroy
// public void shutdown() {
// executorService.shutdown();
// if (sampleClient != null && sampleClient.isConnected()) {
// try {
// sampleClient.disconnect();
// } catch (MqttException e) {
// log.error("MQTT客户端断开连接失败", e);
// }
// }
// }
//
//
//}

View File

@ -1,43 +1,43 @@
package com.muyu.analysis.parsing.consumer;
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
import com.muyu.enterprise.domain.MqttProperties;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author
* @Packagecom.muyu.analysis.parsing.consumer
* @Projectcloud-server
* @nameRabbitListenerComponent
* @Date2024/10/10 20:24
*/
@Component
@Slf4j
public class RabbitListenerComponent {
@Autowired
private MqttClientService mqttClientService;
private static final String FORM_QUEUE = "GO_LINE";
@RabbitListener(queuesToDeclare = @Queue(value = FORM_QUEUE, durable = "true"))
public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
try {
mqttClientService.connectAndSubscribeAsync(mqttProperties);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
log.error("处理RabbitMQ消息时发生错误", e);
}
}
}
//package com.muyu.analysis.parsing.consumer;
//
//
//import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
//import com.muyu.enterprise.domain.MqttProperties;
//import com.rabbitmq.client.Channel;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.rabbit.annotation.Queue;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
//
//
///**
// * @Author李庆帅
// * @Packagecom.muyu.analysis.parsing.consumer
// * @Projectcloud-server
// * @nameRabbitListenerComponent
// * @Date2024/10/10 20:24
// */
//
//@Component
//@Slf4j
//public class RabbitListenerComponent {
// @Autowired
// private MqttClientService mqttClientService;
//
// private static final String FORM_QUEUE = "GO_LINE";
//
// @RabbitListener(queuesToDeclare = @Queue(value = FORM_QUEUE, durable = "true"))
// public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
// try {
// mqttClientService.connectAndSubscribeAsync(mqttProperties);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (Exception e) {
// e.printStackTrace();
// log.error("处理RabbitMQ消息时发生错误", e);
// }
// }
//
//}

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
spring:
application:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
spring:
application:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
spring:
application:

View File

@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: dev
namespace: prod
# Spring
spring: