parent
7f257959ca
commit
0737d44efa
|
@ -1,9 +1,6 @@
|
||||||
package com.muyu.enterprise.cache;
|
package com.muyu.enterprise.cache;
|
||||||
|
|
||||||
import com.muyu.common.cache.config.CacheAbsBasic;
|
import com.muyu.common.cache.config.CacheAbsBasic;
|
||||||
import com.muyu.enterprise.domain.CarCompany;
|
|
||||||
import com.muyu.enterprise.domain.CarTemplate;
|
|
||||||
import com.muyu.enterprise.domain.car.MessageValue;
|
|
||||||
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
|
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -29,6 +26,11 @@ public class CarMessageValueCacheService extends CacheAbsBasic<String,List<Messa
|
||||||
return "messageValue:info";
|
return "messageValue:info";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String keyPost() {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String encode(String key)
|
public String encode(String key)
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,6 +18,11 @@ public class CarVehicleCacheService extends CacheAbsBasic<String, Vehicle> {
|
||||||
return "sysCar:info";
|
return "sysCar:info";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String keyPost() {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String encode(String key){
|
public String encode(String key){
|
||||||
return super.encode(key);
|
return super.encode(key);
|
||||||
|
@ -25,7 +30,7 @@ public class CarVehicleCacheService extends CacheAbsBasic<String, Vehicle> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String decode(String key){
|
public String decode(String key){
|
||||||
return super.decode(key);
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,11 @@ public class CarVehicleTypeCacheService extends CacheAbsBasic<String, VehicleTyp
|
||||||
return "sysCarType:info";
|
return "sysCarType:info";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String keyPost() {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String encode(String key)
|
public String encode(String key)
|
||||||
{
|
{
|
||||||
|
@ -30,7 +35,7 @@ public class CarVehicleTypeCacheService extends CacheAbsBasic<String, VehicleTyp
|
||||||
@Override
|
@Override
|
||||||
public String decode(String key)
|
public String decode(String key)
|
||||||
{
|
{
|
||||||
return super.decode(key);
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -141,12 +141,6 @@
|
||||||
<groupId>com.muyu</groupId>
|
<groupId>com.muyu</groupId>
|
||||||
<artifactId>cloud-common-rabbit</artifactId>
|
<artifactId>cloud-common-rabbit</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- <!– gateway系统模块 –>-->
|
|
||||||
<!-- <dependency>-->
|
|
||||||
<!-- <groupId>com.muyu</groupId>-->
|
|
||||||
<!-- <artifactId>cloud-modules-vehicle-gateway</artifactId>-->
|
|
||||||
<!-- </dependency>-->
|
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,7 @@
|
||||||
package com.muyu.analysis.parsing.mqtt;
|
package com.muyu.analysis.parsing.mqtt;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
|
import com.muyu.analysis.parsing.mqtt.service.MqttClientService;
|
||||||
import com.muyu.common.core.constant.KafkaConstants;
|
|
||||||
import com.muyu.common.core.constant.RedisConstants;
|
|
||||||
import com.muyu.common.core.domain.Result;
|
|
||||||
import com.muyu.common.mqtt.MQTTConnect;
|
|
||||||
import com.muyu.enterprise.cache.CarMessageValueCacheService;
|
import com.muyu.enterprise.cache.CarMessageValueCacheService;
|
||||||
import com.muyu.enterprise.cache.CarVehicleCacheService;
|
import com.muyu.enterprise.cache.CarVehicleCacheService;
|
||||||
import com.muyu.enterprise.cache.CarVehicleTypeCacheService;
|
import com.muyu.enterprise.cache.CarVehicleTypeCacheService;
|
||||||
|
@ -16,7 +13,6 @@ import com.muyu.enterprise.remote.RemoteMessageValueService;
|
||||||
import com.muyu.enterprise.remote.RemoteVehicleService;
|
import com.muyu.enterprise.remote.RemoteVehicleService;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import cn.hutool.json.JSONObject;
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
@ -73,73 +69,14 @@ public class ParsingMQTT {
|
||||||
* 协议解析
|
* 协议解析
|
||||||
*/
|
*/
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public JSONObject mqttClient(MqttMessage mqttMessage,String messageStr) {
|
public JSONObject mqttClient(MqttMessage mqttMessage, String messageStr) {
|
||||||
// String topic = "vehicle";
|
|
||||||
//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
|
|
||||||
//// String clientId = "JavaSample";
|
|
||||||
|
|
||||||
String payload = new String(mqttMessage.getPayload());
|
String payload = new String(mqttMessage.getPayload());
|
||||||
log.info("====:{}", payload);
|
log.info("====:{}", payload);
|
||||||
|
|
||||||
// try {
|
|
||||||
// // 第三个参数为空,默认持久化策略
|
|
||||||
// MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
|
|
||||||
// MqttConnectOptions connOpts = new MqttConnectOptions();
|
|
||||||
// connOpts.setCleanSession(true);
|
|
||||||
// log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
|
|
||||||
// sampleClient.connect(connOpts);
|
|
||||||
// //MQTTConnect.TOPIC
|
|
||||||
// sampleClient.subscribe(MQTTConnect.TOPIC, 0);
|
|
||||||
// sampleClient.setCallback(new MqttCallback() {
|
|
||||||
// // 连接丢失
|
|
||||||
// @Override
|
|
||||||
// public void connectionLost(Throwable throwable) {
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // 连接成功
|
|
||||||
// @Override
|
|
||||||
// public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
||||||
// System.out.println(new String(mqttMessage.getPayload()));
|
|
||||||
//// JSONObject entries = ParsingMQTT.protocolParsing(new String(mqttMessage.getPayload()));
|
|
||||||
////
|
|
||||||
//// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
|
|
||||||
//// entries.toString() );
|
|
||||||
//// kafkaProducer.send(producerRecord);
|
|
||||||
//// log.info("解析之后的数据:"+entries);
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // 接收信息
|
|
||||||
// @Override
|
|
||||||
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// } catch (MqttException me) {
|
|
||||||
// log.info("reason " + me.getReasonCode());
|
|
||||||
// log.info("msg " + me.getMessage());
|
|
||||||
// log.info("loc " + me.getLocalizedMessage());
|
|
||||||
// log.info("cause " + me.getCause());
|
|
||||||
// log.info("excep " + me);
|
|
||||||
// System.out.println("reason " + me.getReasonCode());
|
|
||||||
// System.out.println("msg " + me.getMessage());
|
|
||||||
// System.out.println("loc " + me.getLocalizedMessage());
|
|
||||||
// System.out.println("cause " + me.getCause());
|
|
||||||
// System.out.println("excep " + me);
|
|
||||||
// me.printStackTrace();
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 协议解析
|
* 协议解析
|
||||||
* @param messageStr
|
* @param messageStr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
// public JSONObject protocolParsing() {
|
|
||||||
//根据空格切割数据
|
//根据空格切割数据
|
||||||
String[] hexArray = messageStr.split(" ");
|
String[] hexArray = messageStr.split(" ");
|
||||||
StringBuilder result = new StringBuilder();
|
StringBuilder result = new StringBuilder();
|
||||||
|
@ -175,7 +112,6 @@ public class ParsingMQTT {
|
||||||
jsonObject.put(messageValue.getMessageLabel(), value);
|
jsonObject.put(messageValue.getMessageLabel(), value);
|
||||||
}
|
}
|
||||||
return jsonObject;
|
return jsonObject;
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ package com.muyu.analysis.parsing.mqtt.service;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
import cn.hutool.json.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.muyu.analysis.parsing.mqtt.ParsingMQTT;
|
import com.muyu.analysis.parsing.mqtt.ParsingMQTT;
|
||||||
import com.muyu.common.core.constant.KafkaConstants;
|
import com.muyu.common.core.constant.KafkaConstants;
|
||||||
import com.muyu.enterprise.domain.MqttProperties;
|
import com.muyu.enterprise.domain.MqttProperties;
|
||||||
|
@ -77,8 +77,7 @@ public class MqttClientService {
|
||||||
public void messageArrived(String s, MqttMessage mqttMessage) {
|
public void messageArrived(String s, MqttMessage mqttMessage) {
|
||||||
executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())));
|
executorService.submit(() -> messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload())));
|
||||||
System.out.println(new String(mqttMessage.getPayload()));
|
System.out.println(new String(mqttMessage.getPayload()));
|
||||||
JSONObject entries =messageProcessor.mqttClient(mqttMessage,new String(mqttMessage.getPayload()));
|
JSONObject entries = messageProcessor.mqttClient(mqttMessage, new String(mqttMessage.getPayload()));
|
||||||
// JSONObject entries = o.protocolParsing(new String(mqttMessage.getPayload()));
|
|
||||||
|
|
||||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
|
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
|
||||||
entries.toString() );
|
entries.toString() );
|
||||||
|
|
Loading…
Reference in New Issue