From 820039d3fea251c892b5997ed64af24278edd4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=B8=9C=E4=BD=B3?= <14614659+dongjia-li@user.noreply.gitee.com> Date: Wed, 9 Oct 2024 09:13:37 +0800 Subject: [PATCH] =?UTF-8?q?feat():=20=E4=BC=98=E5=8C=96=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-auth/src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/src/main/resources/bootstrap.yml | 2 +- .../cloud-modules-protocol-parsing/pom.xml | 11 - .../protocol/parsing/ParsingMessage.java | 191 ++++++++++++++++++ .../protocol/parsing/test/ParsingMessage.java | 161 --------------- .../src/main/resources/bootstrap.yml | 6 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- 13 files changed, 203 insertions(+), 184 deletions(-) create mode 100644 cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java delete mode 100644 cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/test/ParsingMessage.java diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml index d8dd0cc..7ed5410 100644 --- a/cloud-auth/src/main/resources/bootstrap.yml +++ b/cloud-auth/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # Spring spring: application: diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml index c9b97fd..906bcab 100644 --- a/cloud-gateway/src/main/resources/bootstrap.yml +++ b/cloud-gateway/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # Spring spring: diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml index 66bc686..9c1cb91 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml @@ -8,7 +8,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml index 489071b..06ace7d 100644 --- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # Spring spring: diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml index 97fb5ad..cbde7dd 100644 --- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # Spring spring: diff --git a/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml index d0dbae7..3cf9ba1 100644 --- a/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # Spring spring: diff --git a/cloud-modules/cloud-modules-protocol-parsing/pom.xml b/cloud-modules/cloud-modules-protocol-parsing/pom.xml index 6d31bae..6baf2a9 100644 --- a/cloud-modules/cloud-modules-protocol-parsing/pom.xml +++ b/cloud-modules/cloud-modules-protocol-parsing/pom.xml @@ -77,17 +77,6 @@ cloud-common-api-doc - - - com.muyu - cloud-common-xxl - - - - com.muyu - cloud-common-rabbit - - com.muyu cloud-modules-enterprise-common diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java new file mode 100644 index 0000000..f72fd30 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java @@ -0,0 +1,191 @@ +package com.muyu.cloud.protocol.parsing; + +import cn.hutool.json.JSONObject; +import com.muyu.domain.Vehicle; +import com.muyu.domain.VehicleType; +import com.muyu.domain.resp.MessageValueListResp; +import com.muyu.enterprise.cache.AllMessageValueCacheService; +import com.muyu.enterprise.cache.VehicleCacheService; +import com.muyu.enterprise.cache.VehicleTypeCacheService; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.List; + +/** + * @Author: LiDongJia + * @Package: com.muyu.cloud.protocol.parsing.service.impl + * @Project: 2112-car-cloud-server + * @name: ParsingServiceImpl + * @Date: 2024/9/28 14:31 + * @Description: 协议解析实现层 + */ +@Log4j2 +@Component +public class ParsingMessage { + + // Kafka生产者 + @Resource + private KafkaProducer kafkaProducer; + // 车辆缓存服务 + private final VehicleCacheService vehicleCacheService; + // 车辆类型缓存服务 + private final VehicleTypeCacheService vehicleTypeCacheService; + // 报文模版缓存服务 + private final AllMessageValueCacheService allMessageValueCacheService; + + // MQTT主题 + private static final String TOPIC = "vehicle"; + // MQTT Broker地址 + private static final String BROKER = "tcp://111.231.50.146:1883"; + // MQTT客户端ID + private static final String CLIENT_ID = "JavaSample"; + // MQTT客户端 + private MqttClient mqttClient; + + /** + * 构造方法, 通过构造器注入依赖的缓存服务 + * @param vehicleCacheService 车辆缓存服务 + * @param vehicleTypeCacheService 车辆类型缓存服务 + * @param allMessageValueCacheService 报文模版缓存服务 + */ + public ParsingMessage(VehicleCacheService vehicleCacheService, VehicleTypeCacheService vehicleTypeCacheService, AllMessageValueCacheService allMessageValueCacheService) { + this.vehicleCacheService = vehicleCacheService; + this.vehicleTypeCacheService = vehicleTypeCacheService; + this.allMessageValueCacheService = allMessageValueCacheService; + } + + /** + * 初始化MQTT连接 + */ + @PostConstruct + public void init() { + connectToMqttBroker(); + } + + /** + * 连接MQTT Broker + */ + private void connectToMqttBroker() { + try { + // 创建MqttClient实例,指定Broker地址、客户端ID以及持久化方式 + mqttClient = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence()); + // 连接MQTT Broker + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + log.info("连接到协议: " + BROKER); + mqttClient.connect(connOpts); + mqttClient.subscribe(TOPIC, 0); + // 设置MQTT回调处理器 + mqttClient.setCallback(new MqttCallbackHandler()); + } catch (MqttException me) { + log.error("连接MQTT Broker失败: [{}]", me.getMessage()); + } + } + + /** + * MQTT回调处理器 + */ + private class MqttCallbackHandler implements MqttCallback { + + // 连接丢失 + @Override + public void connectionLost(Throwable throwable) { + log.error("连接丢失: [{}]", throwable.getMessage()); + } + + // 连接成功 + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + handleMqttMessage(mqttMessage); + } + + // 接收信息 + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + } + + /** + * 处理MQTT消息 + * + * @param mqttMessage + */ + private void handleMqttMessage(MqttMessage mqttMessage) { + // 解析MQTT消息 + String messageStr = new String(mqttMessage.getPayload()); + log.info("接收到MQTT消息: " + messageStr); + // 解析协议 + JSONObject parseMessage = parseProtocol(messageStr); + // 发送Kafka消息 + sendKafkaMessage(parseMessage); + } + + /** + * 解析协议 + * + * @param messageStr + * @return + */ + private JSONObject parseProtocol(String messageStr) { + String[] hexArray = messageStr.split(" "); + // 遍历十六进制数据转换为字符 + StringBuilder stringBuilder = new StringBuilder(); + for (String hex : hexArray) { + int decimal = Integer.parseInt(hex, 16); + stringBuilder.append((char) decimal); + } + // 取出车辆VIN码 + String vehicleVin = stringBuilder.substring(1, 18); + log.info("车辆VIN码: {}", vehicleVin); + // 根据车辆VIN码查询车辆信息 + Vehicle vehicle = vehicleCacheService.get(vehicleVin); + if (vehicle == null) { + throw new RuntimeException("车辆查询失败"); + } + VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId())); + if (vehicleType == null) { + throw new RuntimeException("车辆类型查询失败"); + } + Long templateId = vehicleType.getMessageTemplateId(); + // 根据报文模版ID查询报文模版数据 + List templateList = allMessageValueCacheService.get(String.valueOf(templateId)); + if (templateList == null) { + throw new RuntimeException("报文模版查询失败"); + } + // 判断报文模板列表不为空 + if (templateList.isEmpty()) { + throw new RuntimeException("报文模版为空"); + } + // 存储报文模版解析后的数据 + JSONObject jsonObject = new JSONObject(); + for (MessageValueListResp messageValue : templateList) { + // 起始位下标 + Integer startIndex = messageValue.getMessageStartIndex() - 1; + // 结束位下标 + Integer endIndex = messageValue.getMessageEndIndex(); + // 根据报文模版截取数据 + String value = stringBuilder.substring(startIndex, endIndex); + // 存入数据 + jsonObject.put(messageValue.getMessageLabel(), value); + } + return jsonObject; + } + + /** + * 发送Kafka消息 + * @param parseMessage + */ + private void sendKafkaMessage(JSONObject parseMessage) { + ProducerRecord producerRecord = new ProducerRecord<>("zeshi", parseMessage.toString()); + kafkaProducer.send(producerRecord); + log.info("发送Kafka消息: " + parseMessage); + } +} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/test/ParsingMessage.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/test/ParsingMessage.java deleted file mode 100644 index d0bbdea..0000000 --- a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/test/ParsingMessage.java +++ /dev/null @@ -1,161 +0,0 @@ -package com.muyu.cloud.protocol.parsing.test; - -import cn.hutool.json.JSONObject; -import com.muyu.domain.Vehicle; -import com.muyu.domain.VehicleType; -import com.muyu.domain.resp.MessageValueListResp; -import com.muyu.enterprise.cache.AllMessageValueCacheService; -import com.muyu.enterprise.cache.VehicleCacheService; -import com.muyu.enterprise.cache.VehicleTypeCacheService; -import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.eclipse.paho.client.mqttv3.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.util.List; - -/** - * @Author: LiDongJia - * @Package: com.muyu.cloud.protocol.parsing.service.impl - * @Project: 2112-car-cloud-server - * @name: ParsingServiceImpl - * @Date: 2024/9/28 14:31 - * @Description: 协议解析实现层 - */ -@Log4j2 -@Component -public class ParsingMessage { - - @Resource - private KafkaProducer kafkaProducer; - - @Autowired - private VehicleCacheService vehicleCacheService; - - @Autowired - private VehicleTypeCacheService vehicleTypeCacheService; - - @Autowired - private AllMessageValueCacheService allMessageValueCacheService; - - /** - * 协议解析 - */ - @PostConstruct - public void mqttClient() { - String topic = "vehicle"; - String broker = "tcp://111.231.50.146:1883"; - String clientId = "JavaSample"; - - try { - // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(broker, clientId); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - System.out.println("Connecting to broker: " + broker); - sampleClient.connect(connOpts); - sampleClient.subscribe(topic, 0); - sampleClient.setCallback(new MqttCallback() { - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { - - } - - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - System.out.println(new String(mqttMessage.getPayload())); - JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload())); - - ProducerRecord producerRecord = new ProducerRecord<>("zeshi", - entries.toString() ); - kafkaProducer.send(producerRecord); - System.out.println(entries); - } - - /** - * 协议解析 - * @param messageStr - * @return - */ - public JSONObject protocolParsing(String messageStr) { - //根据空格切割数据 - String[] hexArray = messageStr.split(" "); - StringBuilder result = new StringBuilder(); - //遍历十六进制数据转换为字符 - for (String hex : hexArray) { - int decimal = Integer.parseInt(hex, 16); - result.append((char) decimal); - } - //取出车辆VIN码 - String vehicleVin = result.substring(1, 18); - log.info("车辆VIN码: " + vehicleVin); - //根据车辆VIN码查询报文模板ID - Vehicle vehicle = vehicleCacheService.get(vehicleVin); - Long vehicleTypeId = vehicle.getVehicleTypeId(); - VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicleTypeId)); - Long templateId = vehicleType.getMessageTemplateId(); - List templateList = allMessageValueCacheService.get(String.valueOf(templateId)); -// //从redis缓存中获取报文模板数据 -// try { -// String redisKey = "messageTemplate" + templateId; -// if (redisTemplate.hasKey(redisKey)) { -// List list = redisTemplate.opsForList().range(redisKey, 0, -1); -// templateList = list.stream() -// .map(obj -> JSON.parseObject(obj.toString(), MessageValueListResp.class)) -// .toList(); -// log.info("Redis缓存查询成功"); -// } else { -// Result> byTemplateId = remoteServiceClient.findByTemplateId(templateId); -// templateList = byTemplateId.getData(); -// templateList.forEach( -// listResp -> -// redisTemplate.opsForList().rightPush( -// redisKey, JSON.toJSONString(listResp) -// ) -// ); -// log.info("数据库查询成功"); -// } -// } catch (Exception e) { -// throw new RuntimeException("获取报文模板失败"); -// } - //判断报文模板列表不为空 - if (templateList.isEmpty()) { - throw new RuntimeException("报文模版为空"); - } - //存储报文模版解析后的数据 - JSONObject jsonObject = new JSONObject(); - for (MessageValueListResp messageValue : templateList) { - //起始位下标 - Integer startIndex = messageValue.getMessageStartIndex() - 1; - //结束位下标 - Integer endIndex = messageValue.getMessageEndIndex(); - //根据报文模版截取数据 - String value = result.substring(startIndex, endIndex); - //存入数据 - jsonObject.put(messageValue.getMessageLabel(), value); - } - return jsonObject; - } - - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - }); - } catch (MqttException me) { - System.out.println("reason " + me.getReasonCode()); - System.out.println("msg " + me.getMessage()); - System.out.println("loc " + me.getLocalizedMessage()); - System.out.println("cause " + me.getCause()); - System.out.println("excep " + me); - me.printStackTrace(); - } - } -} diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-protocol-parsing/src/main/resources/bootstrap.yml index 619fc26..120e711 100644 --- a/cloud-modules/cloud-modules-protocol-parsing/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: @@ -58,7 +58,7 @@ spring: kafka: producer: # Kafka服务器 - bootstrap-servers: 120.53.86.181:9092 + bootstrap-servers: 111.231.50.146:9092 # 开启事务,必须在开启了事务的方法中发送,否则报错 transaction-id-prefix: kafkaTx- # 发生错误后,消息重发的次数,开启事务必须设置大于0。 @@ -79,7 +79,7 @@ spring: consumer: # Kafka服务器 - bootstrap-servers: 120.53.86.181:9092 + bootstrap-servers: 111.231.50.146:9092 group-id: firstGroup # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D #auto-commit-interval: 2s diff --git a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml index b5ea3d8..80afd61 100644 --- a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml index 303ca8a..09cb2b0 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: diff --git a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml index 8ce1850..54dd098 100644 --- a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml +++ b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 47.101.49.53:8848 user-name: nacos password: nacos - namespace: seven + namespace: vehicle # Spring spring: