diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/KafkaConstants.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/KafkaConstants.java
new file mode 100644
index 0000000..c7980b4
--- /dev/null
+++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/KafkaConstants.java
@@ -0,0 +1,19 @@
+package com.muyu.common.core.constant;
+
+/**
+ * kafka常量信息
+ * @Author:李庆帅
+ * @Package:com.muyu.common.core.constant
+ * @Project:cloud-server
+ * @name:KafkaConstants
+ * @Date:2024/9/29 16:41
+ */
+public class KafkaConstants {
+
+ /**
+ * 协议解析报文传递数据(队列名称)
+ */
+ public final static String MESSAGE_PARSING = "MessageParsing";
+
+
+}
diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/RedisConstants.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/RedisConstants.java
new file mode 100644
index 0000000..f0e7487
--- /dev/null
+++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/constant/RedisConstants.java
@@ -0,0 +1,17 @@
+package com.muyu.common.core.constant;
+
+/**
+ * redis常量信息
+ * @Author:李庆帅
+ * @Package:com.muyu.common.core.constant
+ * @Project:cloud-server
+ * @name:RedisConstants
+ * @Date:2024/9/29 17:28
+ */
+public class RedisConstants {
+
+ /**
+ * redisKey(协议解析报文传递)
+ */
+ public final static String MESSAGE_TEMPLATE = "messageTemplate";
+}
diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml
index 16ec61d..81d6907 100644
--- a/cloud-common/cloud-common-kafka/pom.xml
+++ b/cloud-common/cloud-common-kafka/pom.xml
@@ -11,6 +11,10 @@
cloud-common-kafka
+
+ cloud-common-kafka
+
+
17
17
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java
index 9883e2e..e739722 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/controller/car/MessageValueController.java
@@ -107,7 +107,7 @@ public class MessageValueController extends BaseController
* @param templateId 请求对象
* @return 返回结果
*/
- @GetMapping("/messageValue/findByTemplateId/{templateId}")
+ @GetMapping("/findByTemplateId/{templateId}")
@Operation(summary = "根据报文模版id查询报文数据", description = "根据报文模版id查询报文数据")
public Result> findByTemplateId(@PathVariable("templateId") Long templateId){
List list = messageValueService.findByTemplateId(templateId);
diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml
index e6f5c4c..c6ad48f 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml
+++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml
@@ -100,6 +100,11 @@
com.muyu
cloud-common-core
+
+
+ com.muyu
+ cloud-common-kafka
+
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/DemoMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/DemoMQTT.java
deleted file mode 100644
index 2ba02a7..0000000
--- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/DemoMQTT.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.muyu.analysis.parsing.MQTT;
-
-import com.muyu.analysis.parsing.controller.ParsingController;
-import org.eclipse.paho.client.mqttv3.*;
-
-/**
- * 测试MQTT
- * @ClassName demo
- * @Description 描述
- * @Author 李庆帅
- * @Date 2024/9/28
- */
-public class DemoMQTT {
-
- public void main(String[] args) {
-
- String topic = "vehicle";
- String content = "Message from MqttPublishSample";
- int qos = 2;
- String broker = "tcp://106.15.136.7:1883";
- String clientId = "JavaSample";
-
-
-
- try {
- // 第三个参数为空,默认持久化策略
- MqttClient sampleClient = new MqttClient(broker, clientId);
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(true);
- System.out.println("Connecting to broker: "+broker);
- sampleClient.connect(connOpts);
- sampleClient.subscribe(topic,0);
- sampleClient.setCallback(new MqttCallback() {
- // 连接丢失
- @Override
- public void connectionLost(Throwable throwable) {
-
- }
- // 连接成功
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
- System.out.println(new String(mqttMessage.getPayload()));
- }
- // 接收信息
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-
- }
- });
- } catch(MqttException me) {
- System.out.println("reason "+me.getReasonCode());
- System.out.println("msg "+me.getMessage());
- System.out.println("loc "+me.getLocalizedMessage());
- System.out.println("cause "+me.getCause());
- System.out.println("excep "+me);
- me.printStackTrace();
- }
- }
-
-
-}
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/impl/ParsingServiceImpl.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
similarity index 68%
rename from cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/impl/ParsingServiceImpl.java
rename to cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
index 909319a..dc0347a 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/service/impl/ParsingServiceImpl.java
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
@@ -1,78 +1,87 @@
-package com.muyu.analysis.parsing.service.impl;
+package com.muyu.analysis.parsing.MQTT;
+import com.muyu.analysis.parsing.remote.RemoteClientService;
+import com.muyu.common.core.constant.KafkaConstants;
+import com.muyu.common.core.constant.RedisConstants;
+import com.muyu.common.core.domain.Result;
+import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.Resource;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.muyu.analysis.parsing.controller.ParsingController;
-import com.muyu.analysis.parsing.remote.RemoteClientService;
-import com.muyu.analysis.parsing.mapper.ParsingMapper;
-import com.muyu.analysis.parsing.service.ParsingService;
-import com.muyu.common.core.domain.Result;
-import com.muyu.enterprise.domain.car.MessageValue;
-import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.stereotype.Service;
+import org.springframework.stereotype.Component;
-import javax.annotation.Resource;
import java.util.List;
/**
- * 协议解析实现层
+ * 协议解析处理数据发送传送到队列
* @Author:李庆帅
- * @Package:com.muyu.analysis.parsing.service.impl
+ * @Package:com.muyu.analysis.parsing.MQTT
* @Project:cloud-server
- * @name:ParsingServiceImpl
- * @Date:2024/9/28 20:53
+ * @name:ParsingMQTT
+ * @Date:2024/9/29 16:08
*/
@Log4j2
-@Service
-public class ParsingServiceImpl extends ServiceImpl
- implements ParsingService
-{
+@Component
+public class ParsingMQTT {
+
@Resource
private RedisTemplate redisTemplate;
@Autowired
- private RemoteClientService remoteServiceClientService;
-
-
-
- @Override
- public void mqtt() {
- String topic = "vehicle";
- String content = "Message from MqttPublishSample";
- int qos = 2;
- String broker = "tcp://106.15.136.7:1883";
- String clientId = "JavaSample";
+ private RemoteClientService remoteServiceClient;
+ @Resource
+ private KafkaProducer kafkaProducer;
+ /**
+ * 协议解析
+ */
+ @PostConstruct
+ public void mqttClient() {
+ String topic = "vehicle";
+ String broker = "tcp://106.15.136.7:1883";
+ String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
- System.out.println("Connecting to broker: "+broker);
+ System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
- sampleClient.subscribe(topic,0);
+ sampleClient.subscribe(topic, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
+
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
- String mqtt= new String(mqttMessage.getPayload());
- JSONObject jsonObject = this.protocolParsing(mqtt);
- System.out.println("转换后:"+jsonObject);
+ JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
+
+ ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
+ entries.toString() );
+ kafkaProducer.send(producerRecord);
+ log.info("解析之后的数据"+entries);
+
}
+ /**
+ * 协议解析
+ * @param messageStr
+ * @return
+ */
public JSONObject protocolParsing(String messageStr) {
//根据空格切割数据
String[] hexArray = messageStr.split(" ");
@@ -86,12 +95,12 @@ public class ParsingServiceImpl extends ServiceImpl
String vehicleVin = result.substring(1, 18);
log.info("车辆VIN码: " + vehicleVin);
//根据车辆VIN码查询报文模板ID
- Result byVehicleVin = remoteServiceClientService.findByVehicleVin(vehicleVin);
+ Result byVehicleVin = remoteServiceClient.findByVehicleVin(vehicleVin);
Long templateId = byVehicleVin.getData();
List templateList;
//从redis缓存中获取报文模板数据
try {
- String redisKey = "messageTemplate" + templateId;
+ String redisKey = RedisConstants.MESSAGE_TEMPLATE + templateId;
if (redisTemplate.hasKey(redisKey)) {
List