diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarMessageValueCacheService.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarMessageValueCacheService.java index 89f1f24..d52957b 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarMessageValueCacheService.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarMessageValueCacheService.java @@ -1,9 +1,6 @@ package com.muyu.enterprise.cache; import com.muyu.common.cache.config.CacheAbsBasic; -import com.muyu.enterprise.domain.CarCompany; -import com.muyu.enterprise.domain.CarTemplate; -import com.muyu.enterprise.domain.car.MessageValue; import com.muyu.enterprise.domain.resp.car.MessageValueListResp; import java.util.List; @@ -29,6 +26,11 @@ public class CarMessageValueCacheService extends CacheAbsBasic { return "sysCar:info"; } + @Override + public String keyPost() { + return ""; + } + @Override public String encode(String key){ return super.encode(key); @@ -25,7 +30,7 @@ public class CarVehicleCacheService extends CacheAbsBasic { @Override public String decode(String key){ - return super.decode(key); + return ""; } diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarVehicleTypeCacheService.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarVehicleTypeCacheService.java index c94b505..3ca86d6 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarVehicleTypeCacheService.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-cache/src/main/java/com/muyu/enterprise/cache/CarVehicleTypeCacheService.java @@ -21,6 +21,11 @@ public class CarVehicleTypeCacheService extends CacheAbsBasiccom.muyu cloud-common-rabbit - - - - - - diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java index b5d0638..0385037 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java @@ -1,10 +1,7 @@ package com.muyu.analysis.parsing.mqtt; +import com.alibaba.fastjson2.JSONObject; import com.muyu.analysis.parsing.mqtt.service.MqttClientService; -import com.muyu.common.core.constant.KafkaConstants; -import com.muyu.common.core.constant.RedisConstants; -import com.muyu.common.core.domain.Result; -import com.muyu.common.mqtt.MQTTConnect; import com.muyu.enterprise.cache.CarMessageValueCacheService; import com.muyu.enterprise.cache.CarVehicleCacheService; import com.muyu.enterprise.cache.CarVehicleTypeCacheService; @@ -16,7 +13,6 @@ import com.muyu.enterprise.remote.RemoteMessageValueService; import com.muyu.enterprise.remote.RemoteVehicleService; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; -import cn.hutool.json.JSONObject; import com.alibaba.fastjson.JSON; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.producer.KafkaProducer; @@ -73,73 +69,14 @@ public class ParsingMQTT { * 协议解析 */ @PostConstruct - public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) { -// String topic = "vehicle"; -//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883"; -//// String clientId = "JavaSample"; - + public JSONObject mqttClient(MqttMessage mqttMessage, String messageStr) { String payload = new String(mqttMessage.getPayload()); log.info("====:{}", payload); - -// try { -// // 第三个参数为空,默认持久化策略 -// MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID); -// MqttConnectOptions connOpts = new MqttConnectOptions(); -// connOpts.setCleanSession(true); -// log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER); -// sampleClient.connect(connOpts); -// //MQTTConnect.TOPIC -// sampleClient.subscribe(MQTTConnect.TOPIC, 0); -// sampleClient.setCallback(new MqttCallback() { -// // 连接丢失 -// @Override -// public void connectionLost(Throwable throwable) { -// -// } -// -// // 连接成功 -// @Override -// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { -// System.out.println(new String(mqttMessage.getPayload())); -//// JSONObject entries = ParsingMQTT.protocolParsing(new String(mqttMessage.getPayload())); -//// -//// ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING, -//// entries.toString() ); -//// kafkaProducer.send(producerRecord); -//// log.info("解析之后的数据:"+entries); -// -// } -// -// -// -// // 接收信息 -// @Override -// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { -// -// } -// }); -// } catch (MqttException me) { -// log.info("reason " + me.getReasonCode()); -// log.info("msg " + me.getMessage()); -// log.info("loc " + me.getLocalizedMessage()); -// log.info("cause " + me.getCause()); -// log.info("excep " + me); -// System.out.println("reason " + me.getReasonCode()); -// System.out.println("msg " + me.getMessage()); -// System.out.println("loc " + me.getLocalizedMessage()); -// System.out.println("cause " + me.getCause()); -// System.out.println("excep " + me); -// me.printStackTrace(); -// } - - - /** * 协议解析 * @param messageStr * @return */ -// public JSONObject protocolParsing() { //根据空格切割数据 String[] hexArray = messageStr.split(" "); StringBuilder result = new StringBuilder(); @@ -175,7 +112,6 @@ public class ParsingMQTT { jsonObject.put(messageValue.getMessageLabel(), value); } return jsonObject; -// } } diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java index 2113df5..782c2ec 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/service/MqttClientService.java @@ -2,7 +2,7 @@ package com.muyu.analysis.parsing.mqtt.service; -import cn.hutool.json.JSONObject; +import com.alibaba.fastjson2.JSONObject; import com.muyu.analysis.parsing.mqtt.ParsingMQTT; import com.muyu.common.core.constant.KafkaConstants; import com.muyu.enterprise.domain.MqttProperties; @@ -77,8 +77,7 @@ public class MqttClientService { public void messageArrived(String s, MqttMessage mqttMessage) { executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()))); System.out.println(new String(mqttMessage.getPayload())); - JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())); -// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload())); + JSONObject entries = messageProcessor.mqttClient(mqttMessage, new String(mqttMessage.getPayload())); ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING, entries.toString() );