feat():修改协议解析方法

dev.vehiclegateway
Number7 2024-10-04 16:57:45 +08:00
parent a3613176f3
commit 8c90364eaa
8 changed files with 121 additions and 53 deletions

View File

@ -19,21 +19,22 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>
<!-- <dependency>-->
<!-- <groupId>org.apache.kafka</groupId>-->
<!-- <artifactId>kafka-clients</artifactId>-->
<!-- <version>3.0.0</version>-->
<!-- </dependency>-->
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId> <artifactId>cloud-common-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies> </dependencies>

View File

@ -35,21 +35,11 @@ public class RabbitMQConsumerUtil {
// 获取到消息 开始消费 // 获取到消息 开始消费
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data)); log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId()); Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
if (add != 1) { if (add != 1) {
return; return;
} }
/**
* ---------------------------------------------------------------
*/
/**
* ------------------------------------------------------------------------------
*/
// 消费消息成功之后需要确认 // 消费消息成功之后需要确认
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息 // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认 // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
@ -76,7 +66,6 @@ public class RabbitMQConsumerUtil {
} }
} }
/** /**
* *
* @param data * @param data
@ -96,7 +85,6 @@ public class RabbitMQConsumerUtil {
return; return;
} }
/** /**
* --------------------------------------------------------------- * ---------------------------------------------------------------
*/ */

View File

@ -23,7 +23,6 @@ public class RabbitMQProducerUtil {
//rabbit //rabbit
private final RabbitTemplate rabbitTemplate; private final RabbitTemplate rabbitTemplate;
/** /**
* *
* *
@ -73,7 +72,6 @@ public class RabbitMQProducerUtil {
/** /**
* Publish/Subscribe * Publish/Subscribe
* fanout * fanout
*
* @param exchange * @param exchange
* @param obj Object * @param obj Object
* @param msg * @param msg

View File

@ -17,11 +17,12 @@
<dependencies> <dependencies>
<dependency> <!-- <dependency>-->
<groupId>com.muyu</groupId> <!-- <groupId>com.muyu</groupId>-->
<artifactId>cloud-common-kafka</artifactId> <!-- <artifactId>cloud-common-kafka</artifactId>-->
<version>3.6.3</version> <!-- <version>3.6.3</version>-->
</dependency> <!-- </dependency>-->
<dependency> <dependency>
<groupId>com.muyu.server</groupId> <groupId>com.muyu.server</groupId>

View File

@ -3,10 +3,9 @@ import cn.hutool.json.JSONObject;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.muyu.common.domain.MessageTemplateType; import com.muyu.common.domain.MessageTemplateType;
import com.muyu.common.domain.SysCar; import com.muyu.common.domain.SysCar;
import com.muyu.common.kafka.config.KafkaProducerConfig;
import com.muyu.common.redis.service.RedisService; import com.muyu.common.redis.service.RedisService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -36,19 +35,13 @@ public class MqttConfigure {
@Autowired @Autowired
private RedisTemplate redisTemplate; private RedisTemplate redisTemplate;
@Autowired // @Autowired
private KafkaProducer kafkaProducer; // private KafkaProducer kafkaProducer;
@Autowired
private SysCarService service;
@Autowired
private MessageTemplateTypeService messageTemplateTypeService;
@PostConstruct @PostConstruct
public void MQTTMonitoring(){ public void MQTTMonitoring(){
String topic = "vehicle"; String topic = "car";
int qos = 2; int qos = 2;
String broker = "tcp://47.101.53.251:1883"; String broker = "tcp://47.101.53.251:1883";
String clientId = "lxy"; String clientId = "lxy";
@ -76,7 +69,7 @@ public class MqttConfigure {
JSONObject jsonObject = new JSONObject(messageContent); JSONObject jsonObject = new JSONObject(messageContent);
// 从JSON对象中获取"msg"字段的值 // 从JSON对象中获取"msg"字段的值
String msgValue = jsonObject.getStr("msg"); String msgValue = jsonObject.getStr("msg");
// messageParsing(msgValue); messageParsing(msgValue);
log.info("接收到的值为:"+msgValue); log.info("接收到的值为:"+msgValue);
} }
//交付完成 //交付完成
@ -113,7 +106,14 @@ public class MqttConfigure {
String carVin = result.substring(0, 18 - 1); String carVin = result.substring(0, 18 - 1);
log.info("carVin码为:" + carVin); log.info("carVin码为:" + carVin);
//根据VIN码获取车辆信息 //根据VIN码获取车辆信息
SysCar carByVin = service.findCarByVin(carVin); SysCar carByVin = null;
List<SysCar> carList = redisService.getCacheList("carList");
for (SysCar sysCar : carList) {
if(sysCar.getCarVin().equals(carVin)){
carByVin=sysCar;
}
}
// SysCar carByVin = service.findCarByVin(carVin);
log.info("车辆信息为:" + carByVin); log.info("车辆信息为:" + carByVin);
//对应车辆所对应的报文模版 //对应车辆所对应的报文模版
Integer templateId = carByVin.getTemplateId(); Integer templateId = carByVin.getTemplateId();
@ -128,12 +128,19 @@ public class MqttConfigure {
templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class)) templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class))
.toList(); .toList();
} else { } else {
List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId); List<MessageTemplateType> templateTypeList1=null;
List<MessageTemplateType> templateTypeList2 = redisService.getCacheList("templateTypeList");
for (MessageTemplateType messageTemplateType : templateTypeList2) {
if(messageTemplateType.getTemplateId()==templateId){
templateTypeList1.add(messageTemplateType);
}
}
// List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId);
templateTypeList = templateTypeList1; templateTypeList = templateTypeList1;
templateTypeList.forEach( templateTypeList.forEach(
templateType -> templateType ->
redisTemplate.opsForList().rightPush( redisTemplate.opsForList().rightPush(
redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType) redisKey, JSON.toJSONString(templateType)
) )
); );
} }
@ -146,18 +153,18 @@ public class MqttConfigure {
//将每个解析后的字段都存入到JSON对象中 //将每个解析后的字段都存入到JSON对象中
jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex)); jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex));
} }
log.info("解析后的报文是:" + jsonObject); log.info("解析后的报文是:" + jsonObject);
sendKafka(jsonObject); // sendKafka(jsonObject);
log.info("发送kafka成功"); log.info("发送kafka成功");
return jsonObject; return jsonObject;
} }
public void sendKafka(JSONObject jsonObject){ // //kafka发送消息
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString()); // public void sendKafka(JSONObject jsonObject){
kafkaProducer.send(stringStringProducerRecord); // ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString());
log.info("kafka发送成功"); // kafkaProducer.send(stringStringProducerRecord);
} // log.info("kafka发送成功");
// }
} }

View File

@ -0,0 +1,10 @@
package com.muyu.template.controller;
/**
* @author liuxinyue
* @Packagecom.muyu.template.controller
* @nameKafkaController
* @Date2024/10/4 16:11
*/
public class KafkaController {
}

View File

@ -0,0 +1,62 @@
package com.muyu.template;
import com.muyu.common.domain.SysCar;
import com.muyu.common.domain.Template;
import com.muyu.common.domain.WarnRule;
import com.muyu.common.domain.WarnStrategy;
import com.muyu.common.redis.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author liuxinyue
* @Packagecom.muyu.template
* @nametest
* @Date2024/10/4 9:42
*/
public class test {
@Resource
private RedisService redisService;
public void main(String[] args) {
//车类型
Long carTypeId=null;
//查找车对应的类型
List<SysCar> carList = redisService.getCacheList("car");
for (SysCar sysCar : carList) {
if(sysCar.getCarVin().equals("")){
//获取到车的类型ID
carTypeId = sysCar.getCarTypeId();
}
}
//查找车类型对应的策略
List<WarnStrategy> warnStrategyList = null;
//该车绑定的报文模版
Long templateId=null;
//获取到车的类型之后 查找对应的策略
List<WarnStrategy> warnStrategy = redisService.getCacheList("warnStrategy");
for (WarnStrategy strategy : warnStrategy) {
if(strategy.getCarTypeId().equals(carTypeId)){
templateId=strategy.getTemplateId();
warnStrategyList.add(strategy);
}
}
//根据ID取出对应的报文模版
List<Template> templateList = redisService.getCacheList("template");
//获取策略对应的规则列表
List<WarnRule> warnRule = redisService.getCacheList("warnRule");
List<WarnRule> warnRuleList = null;
}
}

View File

@ -1,12 +1,13 @@
package com.muyu.server.controller; package com.muyu.server.controller;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.cache.SysCarCacheService;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.SysCar; import com.muyu.common.domain.SysCar;
import com.muyu.common.domain.req.SysCarReq; import com.muyu.common.domain.req.SysCarReq;
import com.muyu.common.domain.resp.SysCarFaultLogVo; import com.muyu.common.domain.resp.SysCarFaultLogVo;
import com.muyu.common.kafka.config.KafkaProducerConfig; import com.muyu.common.domain.resp.SysCarVo;
import com.muyu.common.redis.service.RedisService;
import com.muyu.server.service.SysCarService; import com.muyu.server.service.SysCarService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
@ -36,9 +37,9 @@ public class SysCarController {
@Autowired @Autowired
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
@Autowired @Autowired
private KafkaProducer kafkaProducer; private SysCarCacheService sysCarCacheService;
/** /**
@ -104,10 +105,10 @@ public class SysCarController {
@PostMapping("/findCarByVin") @PostMapping("/findCarByVin")
@Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息") @Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息")
public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){ public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){
List<SysCarVo> carList = sysCarCacheService.get("carList");
log.info("从redis取出的数据为:"+carList);
return Result.success(sysCarService.findCarByVin(carVin)); return Result.success(sysCarService.findCarByVin(carVin));
} }
} }