feat:()修改报错

dev.redis
sy200 2024-10-07 12:11:58 +08:00
parent afdcc48a42
commit d2b29e6323
2 changed files with 0 additions and 155 deletions

View File

@ -1,155 +0,0 @@
package com.muyu.cloud.protocol.parsing.test;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.muyu.cloud.protocol.parsing.feign.RemoteServiceClient;
import com.muyu.common.core.domain.Result;
import com.muyu.domain.resp.MessageValueListResp;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
/**
* @Author: LiDongJia
* @Package: com.muyu.cloud.protocol.parsing.service.impl
* @Project: 2112-car-cloud-server
* @name: ParsingServiceImpl
* @Date: 2024/9/28 14:31
* @Description:
*/
@Log4j2
@Component
public class ParsingTest {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RemoteServiceClient remoteServiceClient;
@Resource
private KafkaProducer<String, String> kafkaProducer;
/**
*
*/
@PostConstruct
public void mqttClient() {
String topic = "vehicle";
String broker = "tcp://111.231.50.146:1883";
String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
public void connectionLost(Throwable throwable) {
}
// 连接成功
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
JSONObject entries = this.protocolParsing(new String(mqttMessage.getPayload()));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi",
entries.toString() );
kafkaProducer.send(producerRecord);
System.out.println(entries);
}
/**
*
* @param messageStr
* @return
*/
public JSONObject protocolParsing(String messageStr) {
//根据空格切割数据
String[] hexArray = messageStr.split(" ");
StringBuilder result = new StringBuilder();
//遍历十六进制数据转换为字符
for (String hex : hexArray) {
int decimal = Integer.parseInt(hex, 16);
result.append((char) decimal);
}
//取出车辆VIN码
String vehicleVin = result.substring(1, 18);
log.info("车辆VIN码: " + vehicleVin);
//根据车辆VIN码查询报文模板ID
Result<Long> byVehicleVin = remoteServiceClient.findByVehicleVin(vehicleVin);
Long templateId = byVehicleVin.getData();
List<MessageValueListResp> templateList;
//从redis缓存中获取报文模板数据
try {
String redisKey = "messageTemplate" + templateId;
if (redisTemplate.hasKey(redisKey)) {
List<Object> list = redisTemplate.opsForList().range(redisKey, 0, -1);
templateList = list.stream()
.map(obj -> JSON.parseObject(obj.toString(), MessageValueListResp.class))
.toList();
log.info("Redis缓存查询成功");
} else {
Result<List<MessageValueListResp>> byTemplateId = remoteServiceClient.findByTemplateId(templateId);
templateList = byTemplateId.getData();
templateList.forEach(
listResp ->
redisTemplate.opsForList().rightPush(
redisKey, JSON.toJSONString(listResp)
)
);
log.info("数据库查询成功");
}
} catch (Exception e) {
throw new RuntimeException("获取报文模板失败");
}
//判断报文模板列表不为空
if (templateList.isEmpty()) {
throw new RuntimeException("报文模版为空");
}
//存储报文模版解析后的数据
JSONObject jsonObject = new JSONObject();
for (MessageValueListResp messageValue : templateList) {
//起始位下标
Integer startIndex = messageValue.getMessageStartIndex() - 1;
//结束位下标
Integer endIndex = messageValue.getMessageEndIndex();
//根据报文模版截取数据
String value = result.substring(startIndex, endIndex);
//存入数据
jsonObject.put(messageValue.getMessageLabel(), value);
}
return jsonObject;
}
// 接收信息
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}