From 8c90364eaa6bc868893a069c813de55275b7f207 Mon Sep 17 00:00:00 2001 From: Number7 <1845377266@qq.com> Date: Fri, 4 Oct 2024 16:57:45 +0800 Subject: [PATCH 01/17] =?UTF-8?q?feat():=E4=BF=AE=E6=94=B9=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=E8=A7=A3=E6=9E=90=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-common/cloud-common-kafka/pom.xml | 13 ++-- .../rabbit/consumer/RabbitMQConsumerUtil.java | 12 ---- .../rabbit/producer/RabbitMQProducerUtil.java | 2 - cloud-modules/cloud-modules-template/pom.xml | 11 ++-- .../muyu/template/config/MqttConfigure.java | 51 ++++++++------- .../template/controller/KafkaController.java | 10 +++ .../src/main/java/com/muyu/template/test.java | 62 +++++++++++++++++++ .../server/controller/SysCarController.java | 13 ++-- 8 files changed, 121 insertions(+), 53 deletions(-) create mode 100644 cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/controller/KafkaController.java create mode 100644 cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml index 7f5126e..abe86fc 100644 --- a/cloud-common/cloud-common-kafka/pom.xml +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -19,21 +19,22 @@ UTF-8 + + + + + + org.apache.kafka kafka-clients - 3.0.0 com.muyu cloud-common-core - - org.apache.kafka - kafka-clients - 2.8.0 - + diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java index 2181562..876453d 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java @@ -35,21 +35,11 @@ public class RabbitMQConsumerUtil { // 获取到消息 开始消费 log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data)); - Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId()); if (add != 1) { return; } - - - /** - * -----------------------------------以下为异步业务操作---------------------------- - */ - - /** - * ------------------------------------------------------------------------------ - */ // 消费消息成功之后需要确认 // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息 // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认 @@ -76,7 +66,6 @@ public class RabbitMQConsumerUtil { } } - /** * 普通消费者 * @param data 数据类型 @@ -96,7 +85,6 @@ public class RabbitMQConsumerUtil { return; } - /** * -----------------------------------以下为异步业务操作---------------------------- */ diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java index fc7c3b8..88b0187 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java @@ -23,7 +23,6 @@ public class RabbitMQProducerUtil { //rabbit private final RabbitTemplate rabbitTemplate; - /** * 简单模型 * @@ -73,7 +72,6 @@ public class RabbitMQProducerUtil { /** * Publish/Subscribe 发布订阅者模型 * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout - * * @param exchange 交换机名称 * @param obj 发送的消息Object * @param msg 响应的内容 diff --git a/cloud-modules/cloud-modules-template/pom.xml b/cloud-modules/cloud-modules-template/pom.xml index 65e4bd1..fa19206 100644 --- a/cloud-modules/cloud-modules-template/pom.xml +++ b/cloud-modules/cloud-modules-template/pom.xml @@ -17,11 +17,12 @@ - - com.muyu - cloud-common-kafka - 3.6.3 - + + + + + + com.muyu.server diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java index b72a7fa..065aa80 100644 --- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java +++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java @@ -3,10 +3,9 @@ import cn.hutool.json.JSONObject; import com.alibaba.fastjson2.JSON; import com.muyu.common.domain.MessageTemplateType; import com.muyu.common.domain.SysCar; -import com.muyu.common.kafka.config.KafkaProducerConfig; import com.muyu.common.redis.service.RedisService; + import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; @@ -36,19 +35,13 @@ public class MqttConfigure { @Autowired private RedisTemplate redisTemplate; - @Autowired - private KafkaProducer kafkaProducer; - - @Autowired - private SysCarService service; - - @Autowired - private MessageTemplateTypeService messageTemplateTypeService; +// @Autowired +// private KafkaProducer kafkaProducer; @PostConstruct public void MQTTMonitoring(){ - String topic = "vehicle"; + String topic = "car"; int qos = 2; String broker = "tcp://47.101.53.251:1883"; String clientId = "lxy"; @@ -76,7 +69,7 @@ public class MqttConfigure { JSONObject jsonObject = new JSONObject(messageContent); // 从JSON对象中获取"msg"字段的值 String msgValue = jsonObject.getStr("msg"); -// messageParsing(msgValue); + messageParsing(msgValue); log.info("接收到的值为:"+msgValue); } //交付完成 @@ -113,7 +106,14 @@ public class MqttConfigure { String carVin = result.substring(0, 18 - 1); log.info("carVin码为:" + carVin); //根据VIN码获取车辆信息 - SysCar carByVin = service.findCarByVin(carVin); + SysCar carByVin = null; + List carList = redisService.getCacheList("carList"); + for (SysCar sysCar : carList) { + if(sysCar.getCarVin().equals(carVin)){ + carByVin=sysCar; + } + } +// SysCar carByVin = service.findCarByVin(carVin); log.info("车辆信息为:" + carByVin); //对应车辆所对应的报文模版 Integer templateId = carByVin.getTemplateId(); @@ -128,12 +128,19 @@ public class MqttConfigure { templateTypeList = list.stream().map(o -> JSON.parseObject(o.toString(), MessageTemplateType.class)) .toList(); } else { - List templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId); + List templateTypeList1=null; + List templateTypeList2 = redisService.getCacheList("templateTypeList"); + for (MessageTemplateType messageTemplateType : templateTypeList2) { + if(messageTemplateType.getTemplateId()==templateId){ + templateTypeList1.add(messageTemplateType); + } + } +// List templateTypeList1 = messageTemplateTypeService.findTemplateById(templateId); templateTypeList = templateTypeList1; templateTypeList.forEach( templateType -> redisTemplate.opsForList().rightPush( - redisKey, com.alibaba.fastjson.JSON.toJSONString(templateType) + redisKey, JSON.toJSONString(templateType) ) ); } @@ -146,18 +153,18 @@ public class MqttConfigure { //将每个解析后的字段都存入到JSON对象中 jsonObject.put(messageTemplateType.getMessageField(), result.substring(startIndex, endIndex)); } - log.info("解析后的报文是:" + jsonObject); - sendKafka(jsonObject); +// sendKafka(jsonObject); log.info("发送kafka成功"); return jsonObject; } - public void sendKafka(JSONObject jsonObject){ - ProducerRecord stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString()); - kafkaProducer.send(stringStringProducerRecord); - log.info("kafka发送成功"); - } +// //kafka发送消息 +// public void sendKafka(JSONObject jsonObject){ +// ProducerRecord stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString()); +// kafkaProducer.send(stringStringProducerRecord); +// log.info("kafka发送成功"); +// } } diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/controller/KafkaController.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/controller/KafkaController.java new file mode 100644 index 0000000..34e4ccd --- /dev/null +++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/controller/KafkaController.java @@ -0,0 +1,10 @@ +package com.muyu.template.controller; + +/** + * @author liuxinyue + * @Package:com.muyu.template.controller + * @name:KafkaController + * @Date:2024/10/4 16:11 + */ +public class KafkaController { +} diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java new file mode 100644 index 0000000..e602a8d --- /dev/null +++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/test.java @@ -0,0 +1,62 @@ +package com.muyu.template; + +import com.muyu.common.domain.SysCar; +import com.muyu.common.domain.Template; +import com.muyu.common.domain.WarnRule; +import com.muyu.common.domain.WarnStrategy; +import com.muyu.common.redis.service.RedisService; +import org.springframework.beans.factory.annotation.Autowired; +import javax.annotation.Resource; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +/** + * @author liuxinyue + * @Package:com.muyu.template + * @name:test + * @Date:2024/10/4 9:42 + */ +public class test { + + @Resource + private RedisService redisService; + + public void main(String[] args) { + + //车类型 + Long carTypeId=null; + //查找车对应的类型 + List carList = redisService.getCacheList("car"); + for (SysCar sysCar : carList) { + if(sysCar.getCarVin().equals("")){ + //获取到车的类型ID + carTypeId = sysCar.getCarTypeId(); + } + } + + //查找车类型对应的策略 + List warnStrategyList = null; + //该车绑定的报文模版 + Long templateId=null; + //获取到车的类型之后 查找对应的策略 + List warnStrategy = redisService.getCacheList("warnStrategy"); + for (WarnStrategy strategy : warnStrategy) { + if(strategy.getCarTypeId().equals(carTypeId)){ + templateId=strategy.getTemplateId(); + warnStrategyList.add(strategy); + } + } + + //根据ID取出对应的报文模版 + List