From 15bcd34b07e5ebcb9d13be45823a0903f39450c7 Mon Sep 17 00:00:00 2001 From: LQS <2506203757@qq.com> Date: Fri, 11 Oct 2024 21:56:22 +0800 Subject: [PATCH] =?UTF-8?q?feat():=E4=BF=AE=E5=A4=8DMQTT=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=EF=BC=8C=E6=8E=A5=E6=94=B6rabbitMq=E6=95=B0=E6=8D=AE=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=EF=BC=8C=E6=8B=BF=E5=88=B0=E6=95=B0=E6=8D=AE=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E6=8A=95=E9=80=92kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- JavaSample-tcp1061513671883/.lck | 0 .../enterprise/domain/MqttProperties.java | 47 +++++ .../cloud-modules-protocol-analysis/pom.xml | 11 + .../analysis/parsing/MQTT/ParsingMQTT.java | 189 ++++++++++-------- .../MQTT/service/MqttClientService.java | 110 ++++++++++ .../consumer/RabbitListenerComponent.java | 43 ++++ .../parsing/manager/MessageProcessor.java | 83 ++++++++ pom.xml | 7 + 8 files changed, 403 insertions(+), 87 deletions(-) create mode 100644 JavaSample-tcp1061513671883/.lck create mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java diff --git a/JavaSample-tcp1061513671883/.lck b/JavaSample-tcp1061513671883/.lck new file mode 100644 index 0000000..e69de29 diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java new file mode 100644 index 0000000..3d8ee27 --- /dev/null +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/src/main/java/com/muyu/enterprise/domain/MqttProperties.java @@ -0,0 +1,47 @@ +package com.muyu.enterprise.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * + * @Author:李庆帅 + * @Package:com.muyu.enterprise.domain + * @Project:cloud-server + * @name:MqttProperties + * @Date:2024/10/10 20:01 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttProperties +{ + /** + * 节点 + */ + private String broker; + /** + * 主题 + */ + private String topic; + /** + * 用户名 + */ + private String userName; + /** + * 密码 + */ + private String password; + /** + * 客户端ID + */ + private String clientId; + /** + * 上报级别 + */ + private int qos=0; + +} diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml index b46dbe9..d9a3bde 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml +++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml @@ -136,6 +136,17 @@ cloud-modules-enterprise-cache + + + com.muyu + cloud-common-rabbit + + + + + + + diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java index 770eba1..b5d0638 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java @@ -1,5 +1,6 @@ package com.muyu.analysis.parsing.mqtt; +import com.muyu.analysis.parsing.mqtt.service.MqttClientService; import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.constant.RedisConstants; import com.muyu.common.core.domain.Result; @@ -7,6 +8,7 @@ import com.muyu.common.mqtt.MQTTConnect; import com.muyu.enterprise.cache.CarMessageValueCacheService; import com.muyu.enterprise.cache.CarVehicleCacheService; import com.muyu.enterprise.cache.CarVehicleTypeCacheService; +import com.muyu.enterprise.domain.MqttProperties; import com.muyu.enterprise.domain.car.Vehicle; import com.muyu.enterprise.domain.car.VehicleType; import com.muyu.enterprise.domain.resp.car.MessageValueListResp; @@ -20,6 +22,7 @@ import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @@ -64,109 +67,121 @@ public class ParsingMQTT { @Resource private CarMessageValueCacheService allMessageValueCacheService; + @Autowired + private MqttClientService mqttClientService; /** * 协议解析 */ @PostConstruct - public void mqttClient() { + public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) { // String topic = "vehicle"; //// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883"; //// String clientId = "JavaSample"; - try { - // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER); - sampleClient.connect(connOpts); - sampleClient.subscribe(MQTTConnect.TOPIC, 0); - sampleClient.setCallback(new MqttCallback() { - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { + String payload = new String(mqttMessage.getPayload()); + log.info("====:{}", payload); - } +// try { +// // 第三个参数为空,默认持久化策略 +// MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID); +// MqttConnectOptions connOpts = new MqttConnectOptions(); +// connOpts.setCleanSession(true); +// log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER); +// sampleClient.connect(connOpts); +// //MQTTConnect.TOPIC +// sampleClient.subscribe(MQTTConnect.TOPIC, 0); +// sampleClient.setCallback(new MqttCallback() { +// // 连接丢失 +// @Override +// public void connectionLost(Throwable throwable) { +// +// } +// +// // 连接成功 +// @Override +// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { +// System.out.println(new String(mqttMessage.getPayload())); +//// JSONObject entries = ParsingMQTT.protocolParsing(new String(mqttMessage.getPayload())); +//// +//// ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING, +//// entries.toString() ); +//// kafkaProducer.send(producerRecord); +//// log.info("解析之后的数据:"+entries); +// +// } +// +// +// +// // 接收信息 +// @Override +// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { +// +// } +// }); +// } catch (MqttException me) { +// log.info("reason " + me.getReasonCode()); +// log.info("msg " + me.getMessage()); +// log.info("loc " + me.getLocalizedMessage()); +// log.info("cause " + me.getCause()); +// log.info("excep " + me); +// System.out.println("reason " + me.getReasonCode()); +// System.out.println("msg " + me.getMessage()); +// System.out.println("loc " + me.getLocalizedMessage()); +// System.out.println("cause " + me.getCause()); +// System.out.println("excep " + me); +// me.printStackTrace(); +// } - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - System.out.println(new String(mqttMessage.getPayload())); - JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload())); - ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING, - entries.toString() ); - kafkaProducer.send(producerRecord); - log.info("解析之后的数据:"+entries); - } - - /** - * 协议解析 - * @param messageStr - * @return - */ - public JSONObject protocolParsing(String messageStr) { - //根据空格切割数据 - String[] hexArray = messageStr.split(" "); - StringBuilder result = new StringBuilder(); - //遍历十六进制数据转换为字符 - for (String hex : hexArray) { - int decimal = Integer.parseInt(hex, 16); - result.append((char) decimal); - } - //取出车辆VIN码 - String vehicleVin = result.substring(1, 18); - log.info("车辆VIN码: " + vehicleVin); - //根据车辆VIN码查询报文模板ID - // 根据车辆VIN码查询车辆信息 - Vehicle vehicle = vehicleCacheService.get(vehicleVin); - VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId())); - Long templateId = vehicleType.getMessageTemplateId(); - List templateList = allMessageValueCacheService.get(String.valueOf(templateId)); - //判断报文模板列表不为空 - if (templateList.isEmpty()) { - log.info("报文模版为空"); - throw new RuntimeException("报文模版为空"); - } - //存储报文模版解析后的数据 - JSONObject jsonObject = new JSONObject(); - for (MessageValueListResp messageValue : templateList) { - //起始位下标 - Integer startIndex = messageValue.getMessageStartIndex() - 1; - //结束位下标 - Integer endIndex = messageValue.getMessageEndIndex(); - //根据报文模版截取数据 - String value = result.substring(startIndex, endIndex); - //存入数据 - jsonObject.put(messageValue.getMessageLabel(), value); - } - return jsonObject; - } - - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - }); - } catch (MqttException me) { - log.info("reason " + me.getReasonCode()); - log.info("msg " + me.getMessage()); - log.info("loc " + me.getLocalizedMessage()); - log.info("cause " + me.getCause()); - log.info("excep " + me); - System.out.println("reason " + me.getReasonCode()); - System.out.println("msg " + me.getMessage()); - System.out.println("loc " + me.getLocalizedMessage()); - System.out.println("cause " + me.getCause()); - System.out.println("excep " + me); - me.printStackTrace(); - } + /** + * 协议解析 + * @param messageStr + * @return + */ +// public JSONObject protocolParsing() { + //根据空格切割数据 + String[] hexArray = messageStr.split(" "); + StringBuilder result = new StringBuilder(); + //遍历十六进制数据转换为字符 + for (String hex : hexArray) { + int decimal = Integer.parseInt(hex, 16); + result.append((char) decimal); + } + //取出车辆VIN码 + String vehicleVin = result.substring(1, 18); + log.info("车辆VIN码: " + vehicleVin); + //根据车辆VIN码查询报文模板ID + // 根据车辆VIN码查询车辆信息 + Vehicle vehicle = vehicleCacheService.get(vehicleVin); + VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId())); + Long templateId = vehicleType.getMessageTemplateId(); + List templateList = allMessageValueCacheService.get(String.valueOf(templateId)); + //判断报文模板列表不为空 + if (templateList.isEmpty()) { + log.info("报文模版为空"); + throw new RuntimeException("报文模版为空"); + } + //存储报文模版解析后的数据 + JSONObject jsonObject = new JSONObject(); + for (MessageValueListResp messageValue : templateList) { + //起始位下标 + Integer startIndex = messageValue.getMessageStartIndex() - 1; + //结束位下标 + Integer endIndex = messageValue.getMessageEndIndex(); + //根据报文模版截取数据 + String value = result.substring(startIndex, endIndex); + //存入数据 + jsonObject.put(messageValue.getMessageLabel(), value); + } + return jsonObject; +// } } + + } diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java new file mode 100644 index 0000000..02e1396 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java @@ -0,0 +1,110 @@ +package com.muyu.analysis.parsing.mqtt.service; + + + +import cn.hutool.json.JSONObject; +import com.muyu.analysis.parsing.mqtt.ParsingMQTT; +import com.muyu.common.core.constant.KafkaConstants; +import com.muyu.enterprise.domain.MqttProperties; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + + +/** + * @Author:李庆帅 + * @Package:com.muyu.analysis.parsing.mqtt.service + * @Project:cloud-server + * @name:MqttClientService + * @Date:2024/10/10 20:24 + */ + +@Service +@Slf4j +public class MqttClientService { + + @Autowired + private ParsingMQTT messageProcessor; + + @Resource + private KafkaProducer kafkaProducer; + + private final ExecutorService executorService = Executors.newCachedThreadPool(); + private MqttClient sampleClient; + + + public void connectAndSubscribeAsync(MqttProperties mqttProperties) { + executorService.submit(() -> { + try { + connectAndSubscribe(mqttProperties); + } catch (MqttException | IOException e) { + log.error("MQTT连接或订阅失败", e); + } + }); + } + + private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException { +// if (sampleClient != null && sampleClient.isConnected()) { +// log.info("MQTT客户端已经连接,跳过重新连接。"); +// return; +// } + + sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId()); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setUserName(mqttProperties.getUserName()); + connOpts.setPassword(mqttProperties.getPassword().toCharArray()); + connOpts.setCleanSession(true); + + sampleClient.connect(connOpts); + sampleClient.subscribe(mqttProperties.getTopic(), 0); + + sampleClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable throwable) { + log.error("连接丢失:{}", throwable.getMessage()); + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) { + executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()))); + System.out.println(new String(mqttMessage.getPayload())); + JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())); +// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload())); + + ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING, + entries.toString() ); + kafkaProducer.send(producerRecord); + log.info("解析之后的数据:"+entries); + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("消息发送完成:{}", iMqttDeliveryToken); + } + }); + } + + @PreDestroy + public void shutdown() { + executorService.shutdown(); + if (sampleClient != null && sampleClient.isConnected()) { + try { + sampleClient.disconnect(); + } catch (MqttException e) { + log.error("MQTT客户端断开连接失败", e); + } + } + } + + +} diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java new file mode 100644 index 0000000..9c83f18 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/consumer/RabbitListenerComponent.java @@ -0,0 +1,43 @@ +package com.muyu.analysis.parsing.consumer; + + +import com.muyu.analysis.parsing.mqtt.service.MqttClientService; +import com.muyu.enterprise.domain.MqttProperties; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + + +/** + * @Author:李庆帅 + * @Package:com.muyu.analysis.parsing.consumer + * @Project:cloud-server + * @name:RabbitListenerComponent + * @Date:2024/10/10 20:24 + */ + +@Component +@Slf4j +public class RabbitListenerComponent { + @Autowired + private MqttClientService mqttClientService; + + private static final String FORM_QUEUE = "GO_LINE"; + + @RabbitListener(queuesToDeclare = @Queue(value = FORM_QUEUE, durable = "true")) + public void downline(MqttProperties mqttProperties, Message message, Channel channel) { + try { + mqttClientService.connectAndSubscribeAsync(mqttProperties); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (Exception e) { + e.printStackTrace(); + log.error("处理RabbitMQ消息时发生错误", e); + } + } + +} diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java new file mode 100644 index 0000000..e54698d --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/manager/MessageProcessor.java @@ -0,0 +1,83 @@ +//package com.muyu.analysis.parsing.manager; +// +// +//import com.alibaba.fastjson2.JSONObject; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerRecord; +//import org.eclipse.paho.client.mqttv3.MqttMessage; +//import org.springframework.stereotype.Service; +// +//import javax.annotation.Resource; +//import java.util.ArrayList; +//import java.util.List; +// +// +// +///** +// * +// * +// * @Author:李庆帅 +// * @Package:com.muyu.analysis.parsing.manager +// * @Project:cloud-server +// * @name:MessageProcessor +// * @Date:2024/10/10 20:24 +// */ +// +//@Slf4j +//@Service +//public class MessageProcessor { +// private static final int ID = 1; +// @Resource +// private SysCarMessageServiceImpl sysCarMessageService; +// @Resource +// private KafkaProducer kafkaProducer; +// +// +// public void processMessage(MqttMessage mqttMessage) { +// String payload = new String(mqttMessage.getPayload()); +// log.info("====:{}", payload); +// +// List carMessages = sysCarMessageService.selectSysCarMessageLists(ID); +// List kafKaDataList = new ArrayList<>(); +// +// String[] test = payload.split(" "); +// for (SysCarMessage carMessage : carMessages) { +// int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1; +// int end = Integer.parseInt(carMessage.getMessageEndIndex()); +// +// StringBuilder hexBuilder = new StringBuilder(); +// for (int i = start; i < end; i++) { +// hexBuilder.append(test[i]); +// } +// +// String hex = hexBuilder.toString(); +// char[] result = new char[hex.length() / 2]; +// for (int x = 0; x < hex.length(); x += 2) { +// int high = Character.digit(hex.charAt(x), 16); +// int low = Character.digit(hex.charAt(x + 1), 16); +// result[x / 2] = (char) ((high << 4) + low); +// } +// +// String value = new String(result); +// kafKaDataList.add(KafKaData.builder() +// .key(carMessage.getMessageTypeCode()) +// .label(carMessage.getMessageTypeCode()) +// .value(value) +// .type(carMessage.getMessageType()) +// .build()); +// } +// +// kafKaDataList.add(KafKaData.builder() +// .key("firmCode") +// .label("企业编码") +// .value("firm01") +// .type("String") +// .build()); +// String jsonString = JSONObject.toJSONString(kafKaDataList); +// ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); +// kafkaProducer.send(producerRecord); +// log.info("kafka投产:{}", jsonString); +// } +// +//} diff --git a/pom.xml b/pom.xml index 3e6c60d..6dd7f42 100644 --- a/pom.xml +++ b/pom.xml @@ -410,6 +410,13 @@ ${muyu.version} + + + com.muyu + cloud-modules-vehicle-gateway + ${muyu.version} + +