Merge remote-tracking branch 'refs/remotes/origin/dev.template' into dev
commit
30782882c3
|
@ -19,21 +19,22 @@
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
</properties>
|
</properties>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<!-- <dependency>-->
|
||||||
|
<!-- <groupId>org.apache.kafka</groupId>-->
|
||||||
|
<!-- <artifactId>kafka-clients</artifactId>-->
|
||||||
|
<!-- <version>3.0.0</version>-->
|
||||||
|
<!-- </dependency>-->
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.kafka</groupId>
|
<groupId>org.apache.kafka</groupId>
|
||||||
<artifactId>kafka-clients</artifactId>
|
<artifactId>kafka-clients</artifactId>
|
||||||
<version>3.0.0</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.muyu</groupId>
|
<groupId>com.muyu</groupId>
|
||||||
<artifactId>cloud-common-core</artifactId>
|
<artifactId>cloud-common-core</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-clients</artifactId>
|
|
||||||
<version>2.8.0</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
|
@ -35,21 +35,11 @@ public class RabbitMQConsumerUtil {
|
||||||
// 获取到消息 开始消费
|
// 获取到消息 开始消费
|
||||||
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
|
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
|
||||||
|
|
||||||
|
|
||||||
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
|
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
|
||||||
|
|
||||||
if (add != 1) {
|
if (add != 1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* -----------------------------------以下为异步业务操作----------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ------------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
// 消费消息成功之后需要确认
|
// 消费消息成功之后需要确认
|
||||||
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
|
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
|
||||||
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
|
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
|
||||||
|
@ -76,7 +66,6 @@ public class RabbitMQConsumerUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 普通消费者
|
* 普通消费者
|
||||||
* @param data 数据类型
|
* @param data 数据类型
|
||||||
|
@ -96,7 +85,6 @@ public class RabbitMQConsumerUtil {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* -----------------------------------以下为异步业务操作----------------------------
|
* -----------------------------------以下为异步业务操作----------------------------
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -23,7 +23,6 @@ public class RabbitMQProducerUtil {
|
||||||
//rabbit
|
//rabbit
|
||||||
private final RabbitTemplate rabbitTemplate;
|
private final RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 简单模型
|
* 简单模型
|
||||||
*
|
*
|
||||||
|
@ -73,7 +72,6 @@ public class RabbitMQProducerUtil {
|
||||||
/**
|
/**
|
||||||
* Publish/Subscribe 发布订阅者模型
|
* Publish/Subscribe 发布订阅者模型
|
||||||
* 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
|
* 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
|
||||||
*
|
|
||||||
* @param exchange 交换机名称
|
* @param exchange 交换机名称
|
||||||
* @param obj 发送的消息Object
|
* @param obj 发送的消息Object
|
||||||
* @param msg 响应的内容
|
* @param msg 响应的内容
|
||||||
|
|
|
@ -17,11 +17,12 @@
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>com.muyu</groupId>
|
<!-- <groupId>com.muyu</groupId>-->
|
||||||
<artifactId>cloud-common-kafka</artifactId>
|
<!-- <artifactId>cloud-common-kafka</artifactId>-->
|
||||||
<version>3.6.3</version>
|
<!-- <version>3.6.3</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.muyu.server</groupId>
|
<groupId>com.muyu.server</groupId>
|
||||||
|
|
|
@ -3,10 +3,9 @@ import cn.hutool.json.JSONObject;
|
||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
import com.muyu.common.domain.MessageTemplateType;
|
import com.muyu.common.domain.MessageTemplateType;
|
||||||
import com.muyu.common.domain.SysCar;
|
import com.muyu.common.domain.SysCar;
|
||||||
import com.muyu.common.kafka.config.KafkaProducerConfig;
|
|
||||||
import com.muyu.common.redis.service.RedisService;
|
import com.muyu.common.redis.service.RedisService;
|
||||||
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -36,19 +35,13 @@ public class MqttConfigure {
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisTemplate redisTemplate;
|
private RedisTemplate redisTemplate;
|
||||||
|
|
||||||
@Autowired
|
// @Autowired
|
||||||
private KafkaProducer kafkaProducer;
|
// private KafkaProducer kafkaProducer;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private SysCarService service;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private MessageTemplateTypeService messageTemplateTypeService;
|
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void MQTTMonitoring(){
|
public void MQTTMonitoring(){
|
||||||
|
|
||||||
String topic = "vehicle";
|
String topic = "car";
|
||||||
int qos = 2;
|
int qos = 2;
|
||||||
String broker = "tcp://47.101.53.251:1883";
|
String broker = "tcp://47.101.53.251:1883";
|
||||||
String clientId = "lxy";
|
String clientId = "lxy";
|
||||||
|
@ -76,7 +69,7 @@ public class MqttConfigure {
|
||||||
JSONObject jsonObject = new JSONObject(messageContent);
|
JSONObject jsonObject = new JSONObject(messageContent);
|
||||||
// 从JSON对象中获取"msg"字段的值
|
// 从JSON对象中获取"msg"字段的值
|
||||||
String msgValue = jsonObject.getStr("msg");
|
String msgValue = jsonObject.getStr("msg");
|
||||||
// messageParsing(msgValue);
|
messageParsing(msgValue);
|
||||||
log.info("接收到的值为:"+msgValue);
|
log.info("接收到的值为:"+msgValue);
|
||||||
}
|
}
|
||||||
//交付完成
|
//交付完成
|
||||||
|
@ -113,7 +106,14 @@ public class MqttConfigure {
|
||||||
String carVin = result.substring(0, 18 - 1);
|
String carVin = result.substring(0, 18 - 1);
|
||||||
log.info("carVin码为:" + carVin);
|
log.info("carVin码为:" + carVin);
|
||||||
//根据VIN码获取车辆信息
|
//根据VIN码获取车辆信息
|
||||||
SysCar carByVin = service.findCarByVin(carVin);
|
SysCar carByVin = null;
|
||||||
|
List<SysCar> carList = redisService.getCacheList("carList");
|
||||||
|
for (SysCar sysCar : carList) {
|
||||||
|
if(sysCar.getCarVin().equals(carVin)){
|
||||||
|
carByVin=sysCar;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// SysCar carByVin = service.findCarByVin(carVin);
|
||||||
log.info("车辆信息为:" + carByVin);
|
log.info("车辆信息为:" + carByVin);
|
||||||
//对应车辆所对应的报文模版
|
//对应车辆所对应的报文模版
|
||||||
Integer templateId = carByVin.getTemplateId();
|
Integer templateId = carByVin.getTemplateId();
|
||||||
|
@ -128,12 +128,19 @@ public class MqttConfigure {
|
||||||
templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class))
|
templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class))
|
||||||
.toList();
|
.toList();
|
||||||
} else {
|
} else {
|
||||||
List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId);
|
List<MessageTemplateType> templateTypeList1=null;
|
||||||
|
List<MessageTemplateType> templateTypeList2 = redisService.getCacheList("templateTypeList");
|
||||||
|
for (MessageTemplateType messageTemplateType : templateTypeList2) {
|
||||||
|
if(messageTemplateType.getTemplateId()==templateId){
|
||||||
|
templateTypeList1.add(messageTemplateType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// List<MessageTemplateType> templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId);
|
||||||
templateTypeList = templateTypeList1;
|
templateTypeList = templateTypeList1;
|
||||||
templateTypeList.forEach(
|
templateTypeList.forEach(
|
||||||
templateType ->
|
templateType ->
|
||||||
redisTemplate.opsForList().rightPush(
|
redisTemplate.opsForList().rightPush(
|
||||||
redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType)
|
redisKey, JSON.toJSONString(templateType)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -146,18 +153,18 @@ public class MqttConfigure {
|
||||||
//将每个解析后的字段都存入到JSON对象中
|
//将每个解析后的字段都存入到JSON对象中
|
||||||
jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex));
|
jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex));
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("解析后的报文是:" + jsonObject);
|
log.info("解析后的报文是:" + jsonObject);
|
||||||
sendKafka(jsonObject);
|
// sendKafka(jsonObject);
|
||||||
log.info("发送kafka成功");
|
log.info("发送kafka成功");
|
||||||
return jsonObject;
|
return jsonObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void sendKafka(JSONObject jsonObject){
|
// //kafka发送消息
|
||||||
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString());
|
// public void sendKafka(JSONObject jsonObject){
|
||||||
kafkaProducer.send(stringStringProducerRecord);
|
// ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString());
|
||||||
log.info("kafka发送成功");
|
// kafkaProducer.send(stringStringProducerRecord);
|
||||||
}
|
// log.info("kafka发送成功");
|
||||||
|
// }
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
package com.muyu.template.controller;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author liuxinyue
|
||||||
|
* @Package:com.muyu.template.controller
|
||||||
|
* @name:KafkaController
|
||||||
|
* @Date:2024/10/4 16:11
|
||||||
|
*/
|
||||||
|
public class KafkaController {
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
package com.muyu.template;
|
||||||
|
|
||||||
|
import com.muyu.common.domain.SysCar;
|
||||||
|
import com.muyu.common.domain.Template;
|
||||||
|
import com.muyu.common.domain.WarnRule;
|
||||||
|
import com.muyu.common.domain.WarnStrategy;
|
||||||
|
import com.muyu.common.redis.service.RedisService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
/**
|
||||||
|
* @author liuxinyue
|
||||||
|
* @Package:com.muyu.template
|
||||||
|
* @name:test
|
||||||
|
* @Date:2024/10/4 9:42
|
||||||
|
*/
|
||||||
|
public class test {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisService redisService;
|
||||||
|
|
||||||
|
public void main(String[] args) {
|
||||||
|
|
||||||
|
//车类型
|
||||||
|
Long carTypeId=null;
|
||||||
|
//查找车对应的类型
|
||||||
|
List<SysCar> carList = redisService.getCacheList("car");
|
||||||
|
for (SysCar sysCar : carList) {
|
||||||
|
if(sysCar.getCarVin().equals("")){
|
||||||
|
//获取到车的类型ID
|
||||||
|
carTypeId = sysCar.getCarTypeId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//查找车类型对应的策略
|
||||||
|
List<WarnStrategy> warnStrategyList = null;
|
||||||
|
//该车绑定的报文模版
|
||||||
|
Long templateId=null;
|
||||||
|
//获取到车的类型之后 查找对应的策略
|
||||||
|
List<WarnStrategy> warnStrategy = redisService.getCacheList("warnStrategy");
|
||||||
|
for (WarnStrategy strategy : warnStrategy) {
|
||||||
|
if(strategy.getCarTypeId().equals(carTypeId)){
|
||||||
|
templateId=strategy.getTemplateId();
|
||||||
|
warnStrategyList.add(strategy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//根据ID取出对应的报文模版
|
||||||
|
List<Template> templateList = redisService.getCacheList("template");
|
||||||
|
|
||||||
|
//获取策略对应的规则列表
|
||||||
|
List<WarnRule> warnRule = redisService.getCacheList("warnRule");
|
||||||
|
|
||||||
|
List<WarnRule> warnRuleList = null;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,12 +1,13 @@
|
||||||
package com.muyu.server.controller;
|
package com.muyu.server.controller;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.muyu.cache.SysCarCacheService;
|
||||||
import com.muyu.common.core.domain.Result;
|
import com.muyu.common.core.domain.Result;
|
||||||
import com.muyu.common.domain.SysCar;
|
import com.muyu.common.domain.SysCar;
|
||||||
import com.muyu.common.domain.req.SysCarReq;
|
import com.muyu.common.domain.req.SysCarReq;
|
||||||
import com.muyu.common.domain.resp.SysCarFaultLogVo;
|
import com.muyu.common.domain.resp.SysCarFaultLogVo;
|
||||||
import com.muyu.common.kafka.config.KafkaProducerConfig;
|
import com.muyu.common.domain.resp.SysCarVo;
|
||||||
import com.muyu.common.redis.service.RedisService;
|
|
||||||
import com.muyu.server.service.SysCarService;
|
import com.muyu.server.service.SysCarService;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
|
@ -36,9 +37,9 @@ public class SysCarController {
|
||||||
@Autowired
|
@Autowired
|
||||||
private RabbitTemplate rabbitTemplate;
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private KafkaProducer kafkaProducer;
|
private SysCarCacheService sysCarCacheService;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,10 +105,10 @@ public class SysCarController {
|
||||||
@PostMapping("/findCarByVin")
|
@PostMapping("/findCarByVin")
|
||||||
@Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息")
|
@Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息")
|
||||||
public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){
|
public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){
|
||||||
|
List<SysCarVo> carList = sysCarCacheService.get("carList");
|
||||||
|
log.info("从redis取出的数据为:"+carList);
|
||||||
return Result.success(sysCarService.findCarByVin(carVin));
|
return Result.success(sysCarService.findCarByVin(carVin));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue