feat 修改报文解析

master
rouchen 2024-06-27 20:55:30 +08:00
parent 3505f719a9
commit 7ce022076d
19 changed files with 60187 additions and 21782 deletions

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -69,6 +69,8 @@ public class EventServiceImpl implements EventService {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
String message = record.value();
String jsonString = JSON.toJSONString(message);
log.info("接收到消息:" + jsonString);
MessageData messageData = JSON.parseObject(message, MessageData.class);
// 将毫秒级时间戳转换为LocalDateTime
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(messageData.getTimestamp())), ZoneId.systemDefault());
@ -216,7 +218,7 @@ public class EventServiceImpl implements EventService {
//判断车辆状态
public void vehicleStatus(MessageData messageData, Date formattedTime){
if (messageData.getVehicleStatus().equals("0")) {
if ( messageData.getVehicleStatus() != null && messageData.getVehicleStatus().equals("0") ) {
if (messageData.getVehicleStatus().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ001")) {
// 车辆状态异常,第一次发生
@ -247,7 +249,7 @@ public class EventServiceImpl implements EventService {
//判断充电状态
private void chargeStatus(MessageData messageData, Date formattedTime) {
if (messageData.getChargeStatus().equals("0")) {
if ( messageData.getChargeStatus() != null &&messageData.getChargeStatus().equals("0")) {
if (messageData.getChargeStatus().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ002")) {
// 车辆状态异常,第一次发生
@ -280,7 +282,7 @@ public class EventServiceImpl implements EventService {
//判断运行状态
private void runStatus(MessageData messageData, Date formattedTime) {
if (messageData.getRunStatus().equals("0")) {
if ( messageData.getRunStatus() != null && messageData.getRunStatus().equals("0")) {
if (messageData.getRunStatus().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ003")) {
// 车辆状态异常,第一次发生
@ -311,7 +313,7 @@ public class EventServiceImpl implements EventService {
}
//判断SOC
private void soc(MessageData messageData, Date formattedTime) {
if (messageData.getSoc().equals("0")) {
if ( messageData.getSoc() != null &&messageData.getSoc().equals("0")) {
if (messageData.getSoc().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ004")) {
// 车辆状态异常,第一次发生
@ -344,7 +346,7 @@ public class EventServiceImpl implements EventService {
// 判断充电工作状态
private void chargeWorkStatus(MessageData messageData, Date formattedTime) {
if (messageData.getChargeWorkStatus().equals("0")) {
if ( messageData.getChargeWorkStatus() != null && messageData.getChargeWorkStatus().equals("0")) {
if (messageData.getChargeWorkStatus().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ005")) {
// 车辆状态异常,第一次发生
@ -376,7 +378,7 @@ public class EventServiceImpl implements EventService {
//判断驱动电机状态
private void driveMotorStatus(MessageData messageData,Date formattedTime){
if (messageData.getDriveMotorStatus().equals("0")) {
if ( messageData.getDriveMotorStatus() != null && messageData.getDriveMotorStatus().equals("0")) {
if (messageData.getDriveMotorStatus().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ006")) {
// 车辆状态异常,第一次发生
@ -407,7 +409,7 @@ public class EventServiceImpl implements EventService {
}
//判断定位
private void location(MessageData messageData,Date formattedTime){
if (messageData.getLocation().equals("0")) {
if ( messageData.getLocation() != null && messageData.getLocation().equals("0")) {
if (messageData.getLocation().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ007")) {
// 车辆状态异常,第一次发生
@ -438,7 +440,7 @@ public class EventServiceImpl implements EventService {
}
//判断EAS
private void eas(MessageData messageData,Date formattedTime){
if (messageData.getEas().equals("0")) {
if ( messageData.getEas() != null && messageData.getEas().equals("0")) {
if (messageData.getVehicleStatus().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ008")) {
// 车辆状态异常,第一次发生
@ -469,7 +471,7 @@ public class EventServiceImpl implements EventService {
}
//判断EPS
private void ptc(MessageData messageData,Date formattedTime){
if (messageData.getPtc().equals("0")) {
if ( messageData.getPtc() != null && messageData.getPtc().equals("0")) {
if (messageData.getPtc().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ009")) {
// 车辆状态异常,第一次发生
@ -500,7 +502,7 @@ public class EventServiceImpl implements EventService {
}
//判断EPS
private void eps(MessageData messageData,Date formattedTime){
if (messageData.getEps().equals("0")) {
if ( messageData.getEps() != null && messageData.getEps().equals("0")) {
if (messageData.getEps().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ010")) {
// 车辆状态异常,第一次发生
@ -531,7 +533,7 @@ public class EventServiceImpl implements EventService {
}
//判断ABS
private void abs(MessageData messageData,Date formattedTime){
if (messageData.getAbs().equals("0")) {
if ( messageData.getAbs() != null && messageData.getAbs().equals("0")) {
if (messageData.getAbs().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ011")) {
// 车辆状态异常,第一次发生
@ -563,7 +565,7 @@ public class EventServiceImpl implements EventService {
//判断MCU
private void mcu(MessageData messageData,Date formattedTime){
if (messageData.getMcu().equals("0")) {
if ( messageData.getMcu() != null && messageData.getMcu().equals("0")) {
if (messageData.getMcu().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ012")) {
// 车辆状态异常,第一次发生
@ -594,7 +596,7 @@ public class EventServiceImpl implements EventService {
}
//判断动力电池加热
private void powerBatteryHeating(MessageData messageData,Date formattedTime){
if (messageData.getPowerBatteryHeating().equals("0")) {
if ( messageData.getPowerBatteryHeating() != null && messageData.getPowerBatteryHeating().equals("0")) {
if (messageData.getPowerBatteryHeating().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ013")) {
// 车辆状态异常,第一次发生
@ -625,7 +627,7 @@ public class EventServiceImpl implements EventService {
}
//判断动力电池电流
private void powerBatteryCurrentStatus(MessageData messageData,Date formattedTime){
if (messageData.getPowerBatteryCurrentStatus().equals("0")) {
if ( messageData.getPowerBatteryCurrentStatus() != null &&messageData.getPowerBatteryCurrentStatus().equals("0")) {
if (messageData.getPowerBatteryCurrentStatus().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ014")) {
// 车辆状态异常,第一次发生
@ -656,7 +658,7 @@ public class EventServiceImpl implements EventService {
}
//判断动力电池保温
private void powerBatteryHeat(MessageData messageData,Date formattedTime){
if (messageData.getPowerBatteryHeat().equals("0")) {
if ( messageData.getPowerBatteryHeat() != null && messageData.getPowerBatteryHeat().equals("0")) {
if (messageData.getPowerBatteryHeat().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ015")) {
// 车辆状态异常,第一次发生
@ -687,7 +689,7 @@ public class EventServiceImpl implements EventService {
}
//判断DCDC
private void dcdc(MessageData messageData,Date formattedTime){
if (messageData.getDcdc().equals("0")) {
if ( messageData.getDcdc() != null && messageData.getDcdc().equals("0")) {
if (messageData.getDcdc().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ016")) {
// 车辆状态异常,第一次发生
@ -718,7 +720,7 @@ public class EventServiceImpl implements EventService {
}
//判断CHG
private void chg(MessageData messageData,Date formattedTime){
if (messageData.getChg().equals("0")) {
if ( messageData.getChg() != null && messageData.getChg().equals("0")) {
if (messageData.getChg().equals("0") && !redisTemplate.hasKey(messageData.getVin() + "-" + "GZ017")) {
// 车辆状态异常,第一次发生
@ -751,7 +753,6 @@ public class EventServiceImpl implements EventService {
public void sendToRabbitMQ(Fault gz001){
rabbitTemplate.convertAndSend("exchange_breakdown","discover_time",JSON.toJSONString(gz001));
log.info("故障信息"+gz001.getVin()+"发送到RabbitMQ");
}

View File

@ -1,22 +1,29 @@
package com.muyu.mqtt;
import com.alibaba.fastjson.JSON;
import com.muyu.mqtt.dao.MessageData;
import com.muyu.utils.ConversionUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.muyu.utils.ConversionUtil.hexStringToString;
/**
* MessageCallbackService
*
@ -41,6 +48,9 @@ public class MessageCallbackService implements MqttCallback {
private static final AtomicInteger PARTITION_COUNTER = new AtomicInteger(0); // 假设NUM_PARTITIONS是已知的分区数量
private static final int NUM_PARTITIONS = 8; // 请根据实际情况设置分区数量
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info("topic:{}", topic);
@ -48,12 +58,16 @@ public class MessageCallbackService implements MqttCallback {
log.info("message content:{}", new String(mqttMessage.getPayload()));
String s = new String(mqttMessage.getPayload());
MessageData main = ConversionUtil.main(s);
// MessageData main = ConversionUtil.main(s);
JSONObject object = messageParsing(s);
log.error("message:{}",object);
// 准备ProducerRecord并发送到Kafka
String kafkaTopic = topic; // 假设MQTT主题与Kafka主题相同
String key = main.getVin(); // 使用vin作为key如果适用
String value = JSON.toJSONString(main);
String vin = object.getString("vin");
System.out.println("VIN: " + vin);
String key = vin; // 使用vin作为key如果适用
String value = JSON.toJSONString(object);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopic, key, value);
kafkaTemplate.send(producerRecord);// 注意这里使用get()会阻塞直到发送完成,实际应用中可能需要异步处理
@ -63,4 +77,63 @@ public class MessageCallbackService implements MqttCallback {
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
private JSONObject messageParsing(String s) {
// 假设hexStringToString是一个将十六进制字符串转换为普通字符串的方法
String hexStringToString = hexStringToString(s);
// 提取字符串中的特定部分
String substring = hexStringToString.substring(1, hexStringToString.length() - 2);
String vin = substring.substring(0, 17);
// 从本地缓存中获取VIN对应的信息
String cachedResult = localCache.getIfPresent(vin);
JSONObject result = null;
if (cachedResult == null) {
// 如果本地缓存中没有找到则从Redis中获取
String redisList = redisTemplate.opsForValue().get(vin);
log.error("redisList:{}", redisList);
JSONArray jsonArray = JSON.parseArray(redisList);
// 创建一个空的JSONObject来存储合并后的数据
result = new JSONObject();
// 遍历JSONArray中的每个JSONObject并将其合并到result中
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject item = jsonArray.getJSONObject(i);
String analyzeKey = item.getString("analyzeKey");
int analyzeStart = item.getInteger("analyzeStart");
int ent = item.getInteger("ent");
String extractedValue = substring.substring(analyzeStart, ent);
result.put(analyzeKey, extractedValue);
}
log.error("result:{}", result);
// 将结果转换为JSON字符串然后存储到本地缓存中
localCache.put(vin, result.toJSONString());
} else {
// 如果本地缓存中有数据将其解析为JSONObject
result = JSON.parseObject(cachedResult);
}
return result;
}
@RabbitListener(queues = "subscription")
public void receiveSms(Message message) {
// 拿到消息中的VIN
String vin = message.getBody().toString();
// 删除本地缓存
localCache.invalidate(vin);
}
/**
*
*/
Cache<String, String> localCache = Caffeine.newBuilder()
.initialCapacity(5)
.maximumSize(10)
.build();
}

View File

@ -0,0 +1,32 @@
package com.muyu.redis.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.iotdb.tsfile.read.filter.operator.In;
import org.checkerframework.checker.units.qual.N;
/**
* RedisData
*
* @author Yangle
* Date 2024/6/26 16:54
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class RedisData {
private String vin;
//报文标签
private String messageLabel;
//起始位
private String initiationPosition;
//终止位
private String terminatePosition;
//报文类型
private String typeIdentification;
}

View File

@ -25,6 +25,8 @@ public class ConversionUtil {
}
public static MessageData main (String args) {
// String str = "<?xml version=\"1.0\"?>\n" +
// "<monitorRoot type=\"param\"><synchronizeSyptom event=\"0\" initial=\"true\"><Action_ECG><Rhythm>Sinus</Rhythm><HR>80</HR><EMD>No Change</EMD><Conduct>0</Conduct></Action_ECG><Action_Osat value=\"94\" isRelativePercent=\"false\"/><Action_BP isRelativePercent=\"false\"><Shrink value=\"120\"/><Stretch value=\"80\"/></Action_BP><Action_Resp breathType=\"Normal\" value=\"14\" isRelativePercent=\"false\"/><Action_etCO2 value=\"34\" isRelativePercent=\"false\"/><Action_Temperature value=\"35.2\"/><Action_CVP value=\"6.0\"/><Action_PAPDia value=\"10\"/><Action_PAPSys value=\"25\"/><Action_WP value=\"9\"/></synchronizeSyptom></monitorRoot>";
// String strToSixteen = strToSixteen(str);
@ -42,6 +44,9 @@ public class ConversionUtil {
//截取第一位和最后两位
String substring = hexStringToString.substring(1, hexStringToString.length() - 2);
// log.error("substring:{}",substring.length());
//vin
String vin = substring.substring(0,17);
// log.error("length:{}",vin.length());