报文模块:修改MQTT监听方法和报文解析方法
parent
b8858754f5
commit
01895d1279
|
@ -16,6 +16,21 @@
|
|||
<module>saas-server</module>
|
||||
</modules>
|
||||
|
||||
<dependencies>
|
||||
<!--mqtt依赖-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-mqtt</artifactId>
|
||||
<version>6.2.5</version>
|
||||
</dependency>
|
||||
|
||||
<!--kafka-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
|
|
|
@ -32,6 +32,12 @@
|
|||
<artifactId>swagger-annotations</artifactId>
|
||||
<version>2.2.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.muyu.server</groupId>
|
||||
<artifactId>saas-server</artifactId>
|
||||
<version>3.6.3</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
package com.muyu.common.config;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.muyu.server.service.TemplateService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.muyu.mqtt.configure
|
||||
* @Project:cloud-server
|
||||
* @name:MqttConfigure
|
||||
* @Date:2024/9/28 16:10
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
public class MqttConfigure {
|
||||
|
||||
@Autowired
|
||||
private TemplateService templateService;
|
||||
|
||||
|
||||
@PostConstruct
|
||||
public void MQTTMonitoring(){
|
||||
String topic = "vehicle";
|
||||
int qos = 2;
|
||||
String broker = "tcp://47.101.53.251:1883";
|
||||
String clientId = "hhhhhh";
|
||||
try {
|
||||
MqttClient sampleClient = new MqttClient(broker, clientId);
|
||||
MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||
//是否清空session
|
||||
connOpts.setCleanSession(true);
|
||||
log.info("Connecting to broker: " + broker);
|
||||
//连接
|
||||
sampleClient.connect(connOpts);
|
||||
sampleClient.subscribe(topic,qos);
|
||||
sampleClient.setCallback(new MqttCallback() {
|
||||
//连接丢失(报错)
|
||||
@Override
|
||||
public void connectionLost(Throwable throwable) {
|
||||
log.error("error:"+throwable.getMessage());
|
||||
}
|
||||
//消息已经接收到
|
||||
@Override
|
||||
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
||||
// 将MQTT消息转换为字符串
|
||||
String messageContent = new String(mqttMessage.getPayload());
|
||||
// 解析JSON字符串
|
||||
JSONObject jsonObject = new JSONObject(messageContent);
|
||||
// 从JSON对象中获取"msg"字段的值
|
||||
String msgValue = jsonObject.getStr("msg");
|
||||
templateService.messageParsing(msgValue);
|
||||
log.info("接收到的值为:"+msgValue);
|
||||
}
|
||||
//交付完成
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||
|
||||
}
|
||||
});
|
||||
} catch(MqttException me) {
|
||||
System.out.println("reason "+me.getReasonCode());
|
||||
System.out.println("msg "+me.getMessage());
|
||||
System.out.println("loc "+me.getLocalizedMessage());
|
||||
System.out.println("cause "+me.getCause());
|
||||
System.out.println("excep "+me);
|
||||
me.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,5 +26,9 @@ public class SysCar extends BaseEntity {
|
|||
private String carBatteryModel;
|
||||
private Long strategyId;
|
||||
private Long groupId;
|
||||
/**
|
||||
* 模版ID
|
||||
*/
|
||||
private Integer templateId;
|
||||
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class CarTypeController {
|
|||
return Result.success(carTypeService.selectCarTypeRespList(id));
|
||||
}
|
||||
|
||||
/**1
|
||||
/**
|
||||
* 根据类型ID获取车辆类型
|
||||
* @param carTypeId
|
||||
* @return
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
package com.muyu.server.controller;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
|
||||
/**
|
||||
* @Author:liuxinyue
|
||||
* @Package:com.muyu.controller
|
||||
* @Project:cloud-server
|
||||
* @name:TemplateService
|
||||
* @Date:2024/9/22 22:02
|
||||
*/
|
||||
public class TemplateService {
|
||||
|
||||
public void messageParsing(String templateMessage){
|
||||
|
||||
//创建一个JSON对象
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
if(templateMessage.length()<18){
|
||||
throw new RuntimeException("错误VIN码,不存在此车");
|
||||
}
|
||||
|
||||
//将报文进行切割
|
||||
String[] split = templateMessage.split(" ");
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
for (String s : split) {
|
||||
|
||||
int i = Integer.parseInt(s, 16);
|
||||
stringBuilder.append((char)i);
|
||||
|
||||
}
|
||||
//取出车辆的VIN码值
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,10 +1,14 @@
|
|||
package com.muyu.server.service.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.muyu.common.domain.MessageTemplateType;
|
||||
import com.muyu.common.domain.SysCar;
|
||||
import com.muyu.common.domain.Template;
|
||||
import com.muyu.server.mapper.TemplateMapper;
|
||||
import com.muyu.server.service.MessageTemplateTypeService;
|
||||
import com.muyu.server.service.SysCarService;
|
||||
import com.muyu.server.service.TemplateService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
|
@ -31,7 +35,8 @@ public class TemplateServiceImpl extends ServiceImpl<TemplateMapper, Template> i
|
|||
@Autowired
|
||||
private static TemplateMapper templateMapper;
|
||||
|
||||
|
||||
@Autowired
|
||||
private SysCarService sysCarService;
|
||||
|
||||
@Autowired
|
||||
private MessageTemplateTypeService messageTemplateTypeService;
|
||||
|
@ -43,42 +48,62 @@ public class TemplateServiceImpl extends ServiceImpl<TemplateMapper, Template> i
|
|||
@Override
|
||||
public void messageParsing(String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException, ExecutionException, InterruptedException {
|
||||
|
||||
List<MessageTemplateType> templateList = templateMapper.findTemplateById(1);
|
||||
String[] split = templateMessage.split(" ");
|
||||
String[] strings = new String[split.length];
|
||||
//给一个JSON对象
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
//先截取出VIN码 然后根据VIN码查询这个车属于什么类型
|
||||
if (templateMessage.length() < 18) {
|
||||
throw new RuntimeException("The vehicle message is incorrect");
|
||||
}
|
||||
//将报文进行切割
|
||||
String[] hexArray = templateMessage.split(" ");
|
||||
StringBuilder result = new StringBuilder();
|
||||
for (String hex : hexArray) {
|
||||
int decimal = Integer.parseInt(hex, 16);
|
||||
result.append((char) decimal);
|
||||
}
|
||||
//取出VIN码
|
||||
String carVin = result.substring(0, 18 - 1);
|
||||
log.info("carVin码为:" + carVin);
|
||||
//根据VIN码获取车辆信息
|
||||
SysCar carByVin = sysCarService.findCarByVin(carVin);
|
||||
//对应车辆所对应的报文模版
|
||||
Integer templateId = carByVin.getTemplateId();
|
||||
|
||||
List<CompletableFuture<String>> futures = new ArrayList<>();
|
||||
for (MessageTemplateType templateType : templateList) {
|
||||
List<MessageTemplateType> templateTypeList;
|
||||
|
||||
futures.add(CompletableFuture.supplyAsync(() -> {
|
||||
int startIndex = Integer.parseInt(String.valueOf(templateType.getStartIndex())) - 1;
|
||||
int endIndex = Integer.parseInt(String.valueOf(templateType.getEndIndex()));
|
||||
StringBuilder hexBuilder = new StringBuilder();
|
||||
for (int j = startIndex; j < endIndex; j++) {
|
||||
hexBuilder.append(split[j]);
|
||||
}
|
||||
// 创建16进制的对象
|
||||
String hex = hexBuilder.toString();
|
||||
// 转橙字符数组
|
||||
char[] result = new char[hex.length() / 2];
|
||||
for (int x = 0; x < hex.length(); x += 2) {
|
||||
// 先转十进制
|
||||
int high = Character.digit(hex.charAt(x), 16);
|
||||
// 转二进制
|
||||
int low = Character.digit(hex.charAt(x + 1), 16);
|
||||
// 转字符
|
||||
result[x / 2] = (char) ((high << 4) + low);
|
||||
}
|
||||
return new String(result);
|
||||
}));
|
||||
//key
|
||||
String redisKey = "messageTemplateType" + templateId;
|
||||
|
||||
//key存在
|
||||
if (redisTemplate.hasKey(redisKey)) {
|
||||
|
||||
List list = redisTemplate.opsForList().range(redisKey, 0, -1);
|
||||
|
||||
templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class))
|
||||
.toList();
|
||||
|
||||
} else {
|
||||
List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId);
|
||||
templateTypeList = templateTypeList1;
|
||||
templateTypeList.forEach(
|
||||
templateType ->
|
||||
redisTemplate.opsForList().rightPush(
|
||||
redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType)
|
||||
)
|
||||
);
|
||||
}
|
||||
//将模版里面有的配置进行循环
|
||||
for (MessageTemplateType messageTemplateType : templateTypeList) {
|
||||
//开始位置
|
||||
Integer startIndex = messageTemplateType.getStartIndex() - 1;
|
||||
//结束位置
|
||||
Integer endIndex = messageTemplateType.getEndIndex();
|
||||
//将每个解析后的字段都存入到JSON对象中
|
||||
jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex));
|
||||
}
|
||||
|
||||
for (int i = 0; i < futures.size(); i++) {
|
||||
strings[i] = futures.get(i).get();
|
||||
}
|
||||
log.info("解析后的报文是:" + jsonObject);
|
||||
|
||||
System.out.println("哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈");
|
||||
log.info("结果是:"+strings);
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue