diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml
index d8dd0cc..7ed5410 100644
--- a/cloud-auth/src/main/resources/bootstrap.yml
+++ b/cloud-auth/src/main/resources/bootstrap.yml
@@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
- namespace: seven
+ namespace: vehicle
# Spring
spring:
application:
diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml
index c9b97fd..906bcab 100644
--- a/cloud-gateway/src/main/resources/bootstrap.yml
+++ b/cloud-gateway/src/main/resources/bootstrap.yml
@@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
- namespace: seven
+ namespace: vehicle
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml
index 66bc686..9c1cb91 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml
@@ -8,7 +8,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
- namespace: seven
+ namespace: vehicle
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml
index 489071b..06ace7d 100644
--- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml
@@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
- namespace: seven
+ namespace: vehicle
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml
index 97fb5ad..cbde7dd 100644
--- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml
@@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
- namespace: seven
+ namespace: vehicle
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml
index d0dbae7..3cf9ba1 100644
--- a/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-gen/src/src/main/resources/bootstrap.yml
@@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
- namespace: seven
+ namespace: vehicle
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-protocol-parsing/pom.xml b/cloud-modules/cloud-modules-protocol-parsing/pom.xml
index 6d31bae..6baf2a9 100644
--- a/cloud-modules/cloud-modules-protocol-parsing/pom.xml
+++ b/cloud-modules/cloud-modules-protocol-parsing/pom.xml
@@ -77,17 +77,6 @@
cloud-common-api-doc
-
-
- com.muyu
- cloud-common-xxl
-
-
-
- com.muyu
- cloud-common-rabbit
-
-
com.muyu
cloud-modules-enterprise-common
diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java
new file mode 100644
index 0000000..058c21d
--- /dev/null
+++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java
@@ -0,0 +1,185 @@
+package com.muyu.cloud.protocol.parsing;
+
+import cn.hutool.json.JSONObject;
+import com.muyu.domain.Vehicle;
+import com.muyu.domain.VehicleType;
+import com.muyu.domain.resp.MessageValueListResp;
+import com.muyu.enterprise.cache.AllMessageValueCacheService;
+import com.muyu.enterprise.cache.VehicleCacheService;
+import com.muyu.enterprise.cache.VehicleTypeCacheService;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * @Author: LiDongJia
+ * @Package: com.muyu.cloud.protocol.parsing.service.impl
+ * @Project: 2112-car-cloud-server
+ * @name: ParsingServiceImpl
+ * @Date: 2024/9/28 14:31
+ * @Description: 协议解析实现层
+ */
+@Log4j2
+@Component
+public class ParsingMessage {
+
+ // Kafka生产者
+ @Resource
+ private KafkaProducer kafkaProducer;
+ // 车辆缓存服务
+ @Autowired
+ private VehicleCacheService vehicleCacheService;
+ // 车辆类型缓存服务
+ @Autowired
+ private VehicleTypeCacheService vehicleTypeCacheService;
+ // 报文模版缓存服务
+ @Autowired
+ private AllMessageValueCacheService allMessageValueCacheService;
+
+ // MQTT主题
+ private static final String TOPIC = "vehicle";
+ // MQTT Broker地址
+ private static final String BROKER = "tcp://111.231.50.146:1883";
+ // MQTT客户端ID
+ private static final String CLIENT_ID = "JavaSample";
+ // MQTT客户端
+ private MqttClient mqttClient;
+ // kafka topic
+ private static final String TIPSY = "tipsy";
+
+ /**
+ * 初始化MQTT连接
+ */
+ @PostConstruct
+ public void init() {
+ connectToMqttBroker();
+ }
+
+ /**
+ * 连接MQTT Broker
+ */
+ private void connectToMqttBroker() {
+ try {
+ // 创建MqttClient实例,指定Broker地址、客户端ID以及持久化方式
+ mqttClient = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
+ // 连接MQTT Broker
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ log.info("连接到协议: " + BROKER);
+ mqttClient.connect(connOpts);
+ mqttClient.subscribe(TOPIC, 0);
+ // 设置MQTT回调处理器
+ mqttClient.setCallback(new MqttCallbackHandler());
+ } catch (MqttException me) {
+ log.error("连接MQTT Broker失败: [{}]", me.getMessage());
+ }
+ }
+
+ /**
+ * MQTT回调处理器
+ */
+ private class MqttCallbackHandler implements MqttCallback {
+
+ // 连接丢失
+ @Override
+ public void connectionLost(Throwable throwable) {
+ log.error("连接丢失: [{}]", throwable.getMessage());
+ }
+
+ // 连接成功
+ @Override
+ public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+ handleMqttMessage(mqttMessage);
+ }
+
+ // 接收信息
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+
+ }
+ }
+
+ /**
+ * 处理MQTT消息
+ *
+ * @param mqttMessage
+ */
+ private void handleMqttMessage(MqttMessage mqttMessage) {
+ // 解析MQTT消息
+ String messageStr = new String(mqttMessage.getPayload());
+ log.info("接收到MQTT消息: " + messageStr);
+ // 解析协议
+ JSONObject parseMessage = parseProtocol(messageStr);
+ // 发送Kafka消息
+ sendKafkaMessage(parseMessage);
+ }
+
+ /**
+ * 解析协议
+ *
+ * @param messageStr
+ * @return
+ */
+ private JSONObject parseProtocol(String messageStr) {
+ String[] hexArray = messageStr.split(" ");
+ // 遍历十六进制数据转换为字符
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String hex : hexArray) {
+ int decimal = Integer.parseInt(hex, 16);
+ stringBuilder.append((char) decimal);
+ }
+ // 取出车辆VIN码
+ String vehicleVin = stringBuilder.substring(1, 18);
+ log.info("车辆VIN码: {}", vehicleVin);
+ // 根据车辆VIN码查询车辆信息
+ Vehicle vehicle = vehicleCacheService.get(vehicleVin);
+ if (vehicle == null) {
+ throw new RuntimeException("车辆查询失败");
+ }
+ VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId()));
+ if (vehicleType == null) {
+ throw new RuntimeException("车辆类型查询失败");
+ }
+ Long templateId = vehicleType.getMessageTemplateId();
+ // 根据报文模版ID查询报文模版数据
+ List templateList = allMessageValueCacheService.get(String.valueOf(templateId));
+ if (templateList == null) {
+ throw new RuntimeException("报文模版查询失败");
+ }
+ // 判断报文模板列表不为空
+ if (templateList.isEmpty()) {
+ throw new RuntimeException("报文模版为空");
+ }
+ // 存储报文模版解析后的数据
+ JSONObject jsonObject = new JSONObject();
+ for (MessageValueListResp messageValue : templateList) {
+ // 起始位下标
+ Integer startIndex = messageValue.getMessageStartIndex() - 1;
+ // 结束位下标
+ Integer endIndex = messageValue.getMessageEndIndex();
+ // 根据报文模版截取数据
+ String value = stringBuilder.substring(startIndex, endIndex);
+ // 存入数据
+ jsonObject.put(messageValue.getMessageLabel(), value);
+ }
+ return jsonObject;
+ }
+
+ /**
+ * 发送Kafka消息
+ * @param parseMessage
+ */
+ private void sendKafkaMessage(JSONObject parseMessage) {
+ ProducerRecord producerRecord = new ProducerRecord<>(TIPSY, parseMessage.toString());
+ kafkaProducer.send(producerRecord);
+ log.info("发送Kafka消息: " + parseMessage);
+ }
+}
diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/test/ParsingMessage.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/test/ParsingMessage.java
deleted file mode 100644
index d0bbdea..0000000
--- a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/test/ParsingMessage.java
+++ /dev/null
@@ -1,161 +0,0 @@
-package com.muyu.cloud.protocol.parsing.test;
-
-import cn.hutool.json.JSONObject;
-import com.muyu.domain.Vehicle;
-import com.muyu.domain.VehicleType;
-import com.muyu.domain.resp.MessageValueListResp;
-import com.muyu.enterprise.cache.AllMessageValueCacheService;
-import com.muyu.enterprise.cache.VehicleCacheService;
-import com.muyu.enterprise.cache.VehicleTypeCacheService;
-import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.eclipse.paho.client.mqttv3.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-import java.util.List;
-
-/**
- * @Author: LiDongJia
- * @Package: com.muyu.cloud.protocol.parsing.service.impl
- * @Project: 2112-car-cloud-server
- * @name: ParsingServiceImpl
- * @Date: 2024/9/28 14:31
- * @Description: 协议解析实现层
- */
-@Log4j2
-@Component
-public class ParsingMessage {
-
- @Resource
- private KafkaProducer kafkaProducer;
-
- @Autowired
- private VehicleCacheService vehicleCacheService;
-
- @Autowired
- private VehicleTypeCacheService vehicleTypeCacheService;
-
- @Autowired
- private AllMessageValueCacheService allMessageValueCacheService;
-
- /**
- * 协议解析
- */
- @PostConstruct
- public void mqttClient() {
- String topic = "vehicle";
- String broker = "tcp://111.231.50.146:1883";
- String clientId = "JavaSample";
-
- try {
- // 第三个参数为空,默认持久化策略
- MqttClient sampleClient = new MqttClient(broker, clientId);
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(true);
- System.out.println("Connecting to broker: " + broker);
- sampleClient.connect(connOpts);
- sampleClient.subscribe(topic, 0);
- sampleClient.setCallback(new MqttCallback() {
- // 连接丢失
- @Override
- public void connectionLost(Throwable throwable) {
-
- }
-
- // 连接成功
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
- System.out.println(new String(mqttMessage.getPayload()));
- JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
-
- ProducerRecord producerRecord = new ProducerRecord<>("zeshi",
- entries.toString() );
- kafkaProducer.send(producerRecord);
- System.out.println(entries);
- }
-
- /**
- * 协议解析
- * @param messageStr
- * @return
- */
- public JSONObject protocolParsing(String messageStr) {
- //根据空格切割数据
- String[] hexArray = messageStr.split(" ");
- StringBuilder result = new StringBuilder();
- //遍历十六进制数据转换为字符
- for (String hex : hexArray) {
- int decimal = Integer.parseInt(hex, 16);
- result.append((char) decimal);
- }
- //取出车辆VIN码
- String vehicleVin = result.substring(1, 18);
- log.info("车辆VIN码: " + vehicleVin);
- //根据车辆VIN码查询报文模板ID
- Vehicle vehicle = vehicleCacheService.get(vehicleVin);
- Long vehicleTypeId = vehicle.getVehicleTypeId();
- VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicleTypeId));
- Long templateId = vehicleType.getMessageTemplateId();
- List templateList = allMessageValueCacheService.get(String.valueOf(templateId));
-// //从redis缓存中获取报文模板数据
-// try {
-// String redisKey = "messageTemplate" + templateId;
-// if (redisTemplate.hasKey(redisKey)) {
-// List