Merge branch 'dev.perfect.protocol' into dev

dev.fault.change
李东佳 2024-10-09 12:18:07 +08:00
commit 7bfd5f8f88
13 changed files with 197 additions and 184 deletions

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# Spring
spring:
application:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# Spring
spring:

View File

@ -8,7 +8,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# Spring
spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# Spring
spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# Spring
spring:

View File

@ -77,17 +77,6 @@
<artifactId>cloud-common-api-doc</artifactId>
</dependency>
<!-- XllJob定时任务 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-xxl</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-rabbit</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-enterprise-common</artifactId>

View File

@ -0,0 +1,185 @@
package com.muyu.cloud.protocol.parsing;
import cn.hutool.json.JSONObject;
import com.muyu.domain.Vehicle;
import com.muyu.domain.VehicleType;
import com.muyu.domain.resp.MessageValueListResp;
import com.muyu.enterprise.cache.AllMessageValueCacheService;
import com.muyu.enterprise.cache.VehicleCacheService;
import com.muyu.enterprise.cache.VehicleTypeCacheService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing.service.impl
* @Project: 2112-car-cloud-server
* @name: ParsingServiceImpl
* @Date: 2024/9/28 14:31
* @Description:
*/
@Log4j2
@Component
public class ParsingMessage {
// Kafka生产者
@Resource
private KafkaProducer<String, String> kafkaProducer;
// 车辆缓存服务
@Autowired
private VehicleCacheService vehicleCacheService;
// 车辆类型缓存服务
@Autowired
private VehicleTypeCacheService vehicleTypeCacheService;
// 报文模版缓存服务
@Autowired
private AllMessageValueCacheService allMessageValueCacheService;
// MQTT主题
private static final String TOPIC = "vehicle";
// MQTT Broker地址
private static final String BROKER = "tcp://111.231.50.146:1883";
// MQTT客户端ID
private static final String CLIENT_ID = "JavaSample";
// MQTT客户端
private MqttClient mqttClient;
// kafka topic
private static final String TIPSY = "tipsy";
/**
* MQTT
*/
@PostConstruct
public void init() {
connectToMqttBroker();
}
/**
* MQTT Broker
*/
private void connectToMqttBroker() {
try {
// 创建MqttClient实例指定Broker地址、客户端ID以及持久化方式
mqttClient = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
// 连接MQTT Broker
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
log.info("连接到协议: " + BROKER);
mqttClient.connect(connOpts);
mqttClient.subscribe(TOPIC, 0);
// 设置MQTT回调处理器
mqttClient.setCallback(new MqttCallbackHandler());
} catch (MqttException me) {
log.error("连接MQTT Broker失败: [{}]", me.getMessage());
}
}
/**
* MQTT
*/
private class MqttCallbackHandler implements MqttCallback {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
log.error("连接丢失: [{}]", throwable.getMessage());
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
handleMqttMessage(mqttMessage);
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
/**
* MQTT
*
* @param mqttMessage
*/
private void handleMqttMessage(MqttMessage mqttMessage) {
// 解析MQTT消息
String messageStr = new String(mqttMessage.getPayload());
log.info("接收到MQTT消息: " + messageStr);
// 解析协议
JSONObject parseMessage = parseProtocol(messageStr);
// 发送Kafka消息
sendKafkaMessage(parseMessage);
}
/**
*
*
* @param messageStr
* @return
*/
private JSONObject parseProtocol(String messageStr) {
String[] hexArray = messageStr.split(" ");
// 遍历十六进制数据转换为字符
StringBuilder stringBuilder = new StringBuilder();
for (String hex : hexArray) {
int decimal = Integer.parseInt(hex, 16);
stringBuilder.append((char) decimal);
}
// 取出车辆VIN码
String vehicleVin = stringBuilder.substring(1, 18);
log.info("车辆VIN码: {}", vehicleVin);
// 根据车辆VIN码查询车辆信息
Vehicle vehicle = vehicleCacheService.get(vehicleVin);
if (vehicle == null) {
throw new RuntimeException("车辆查询失败");
}
VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicle.getVehicleTypeId()));
if (vehicleType == null) {
throw new RuntimeException("车辆类型查询失败");
}
Long templateId = vehicleType.getMessageTemplateId();
// 根据报文模版ID查询报文模版数据
List<MessageValueListResp> templateList = allMessageValueCacheService.get(String.valueOf(templateId));
if (templateList == null) {
throw new RuntimeException("报文模版查询失败");
}
// 判断报文模板列表不为空
if (templateList.isEmpty()) {
throw new RuntimeException("报文模版为空");
}
// 存储报文模版解析后的数据
JSONObject jsonObject = new JSONObject();
for (MessageValueListResp messageValue : templateList) {
// 起始位下标
Integer startIndex = messageValue.getMessageStartIndex() - 1;
// 结束位下标
Integer endIndex = messageValue.getMessageEndIndex();
// 根据报文模版截取数据
String value = stringBuilder.substring(startIndex, endIndex);
// 存入数据
jsonObject.put(messageValue.getMessageLabel(), value);
}
return jsonObject;
}
/**
* Kafka
* @param parseMessage
*/
private void sendKafkaMessage(JSONObject parseMessage) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TIPSY, parseMessage.toString());
kafkaProducer.send(producerRecord);
log.info("发送Kafka消息: " + parseMessage);
}
}

View File

@ -1,161 +0,0 @@
package com.muyu.cloud.protocol.parsing.test;
import cn.hutool.json.JSONObject;
import com.muyu.domain.Vehicle;
import com.muyu.domain.VehicleType;
import com.muyu.domain.resp.MessageValueListResp;
import com.muyu.enterprise.cache.AllMessageValueCacheService;
import com.muyu.enterprise.cache.VehicleCacheService;
import com.muyu.enterprise.cache.VehicleTypeCacheService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing.service.impl
* @Project: 2112-car-cloud-server
* @name: ParsingServiceImpl
* @Date: 2024/9/28 14:31
* @Description:
*/
@Log4j2
@Component
public class ParsingMessage {
@Resource
private KafkaProducer<String, String> kafkaProducer;
@Autowired
private VehicleCacheService vehicleCacheService;
@Autowired
private VehicleTypeCacheService vehicleTypeCacheService;
@Autowired
private AllMessageValueCacheService allMessageValueCacheService;
/**
*
*/
@PostConstruct
public void mqttClient() {
String topic = "vehicle";
String broker = "tcp://111.231.50.146:1883";
String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi",
entries.toString() );
kafkaProducer.send(producerRecord);
System.out.println(entries);
}
/**
*
* @param messageStr
* @return
*/
public JSONObject protocolParsing(String messageStr) {
//根据空格切割数据
String[] hexArray = messageStr.split(" ");
StringBuilder result = new StringBuilder();
//遍历十六进制数据转换为字符
for (String hex : hexArray) {
int decimal = Integer.parseInt(hex, 16);
result.append((char) decimal);
}
//取出车辆VIN码
String vehicleVin = result.substring(1, 18);
log.info("车辆VIN码: " + vehicleVin);
//根据车辆VIN码查询报文模板ID
Vehicle vehicle = vehicleCacheService.get(vehicleVin);
Long vehicleTypeId = vehicle.getVehicleTypeId();
VehicleType vehicleType = vehicleTypeCacheService.get(String.valueOf(vehicleTypeId));
Long templateId = vehicleType.getMessageTemplateId();
List<MessageValueListResp> templateList = allMessageValueCacheService.get(String.valueOf(templateId));
// //从redis缓存中获取报文模板数据
// try {
// String redisKey = "messageTemplate" + templateId;
// if (redisTemplate.hasKey(redisKey)) {
// List<Object> list = redisTemplate.opsForList().range(redisKey, 0, -1);
// templateList = list.stream()
// .map(obj -> JSON.parseObject(obj.toString(), MessageValueListResp.class))
// .toList();
// log.info("Redis缓存查询成功");
// } else {
// Result<List<MessageValueListResp>> byTemplateId = remoteServiceClient.findByTemplateId(templateId);
// templateList = byTemplateId.getData();
// templateList.forEach(
// listResp ->
// redisTemplate.opsForList().rightPush(
// redisKey, JSON.toJSONString(listResp)
// )
// );
// log.info("数据库查询成功");
// }
// } catch (Exception e) {
// throw new RuntimeException("获取报文模板失败");
// }
//判断报文模板列表不为空
if (templateList.isEmpty()) {
throw new RuntimeException("报文模版为空");
}
//存储报文模版解析后的数据
JSONObject jsonObject = new JSONObject();
for (MessageValueListResp messageValue : templateList) {
//起始位下标
Integer startIndex = messageValue.getMessageStartIndex() - 1;
//结束位下标
Integer endIndex = messageValue.getMessageEndIndex();
//根据报文模版截取数据
String value = result.substring(startIndex, endIndex);
//存入数据
jsonObject.put(messageValue.getMessageLabel(), value);
}
return jsonObject;
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:
@ -58,7 +58,7 @@ spring:
kafka:
producer:
# Kafka服务器
bootstrap-servers: 120.53.86.181:9092
bootstrap-servers: 111.231.50.146:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错
transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。
@ -79,7 +79,7 @@ spring:
consumer:
# Kafka服务器
bootstrap-servers: 120.53.86.181:9092
bootstrap-servers: 111.231.50.146:9092
group-id: firstGroup
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
#auto-commit-interval: 2s

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:

View File

@ -7,7 +7,7 @@ nacos:
addr: 47.101.49.53:8848
user-name: nacos
password: nacos
namespace: seven
namespace: vehicle
# Spring
spring: