From 9499d9738e2b013dbe79c01517ebee01d2382076 Mon Sep 17 00:00:00 2001 From: Liu Wu <2780205363@qq.com> Date: Mon, 7 Oct 2024 12:06:18 +0800 Subject: [PATCH] =?UTF-8?q?fix():=20=E4=BA=8B=E4=BB=B6=E5=9F=BA=E7=A1=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-common/cloud-common-rabbit/pom.xml | 1 + .../consumer/RabbitMQConsumerUtil.java | 8 +- .../producer/RabbitMQProducerUtil.java | 202 +++++++++--------- .../com/muyu/rabbitmq/util/CacheUtil.java | 37 ---- cloud-modules/cloud-event/pom.xml | 6 + .../com/muyu/event/basic/EventPublisher.java | 2 - .../com/muyu/event/consumer/MqConsumer.java | 71 ++++++ .../muyu/event/controller/DataController.java | 1 + .../event/controller/IoTDBController.java | 14 -- .../event/listener/AddDatabaseListener.java | 1 + .../server/controller/TemplateController.java | 3 +- 11 files changed, 186 insertions(+), 160 deletions(-) delete mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/util/CacheUtil.java diff --git a/cloud-common/cloud-common-rabbit/pom.xml b/cloud-common/cloud-common-rabbit/pom.xml index 5d535ad..f8daff2 100644 --- a/cloud-common/cloud-common-rabbit/pom.xml +++ b/cloud-common/cloud-common-rabbit/pom.xml @@ -35,6 +35,7 @@ cloud-common-redis + diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java index aa41b60..f823509 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java @@ -2,7 +2,7 @@ package com.muyu.rabbitmq.consumer; import com.alibaba.fastjson2.JSONObject; import com.muyu.common.redis.service.RedisService; -import com.muyu.rabbitmq.util.CacheUtil; +//import com.muyu.rabbitmq.util.CacheUtil; import com.rabbitmq.client.Channel; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -26,8 +26,8 @@ public class RabbitMQConsumerUtil { private final RedisService redisService; - @Autowired - private CacheUtil cacheUtil; +// @Autowired +// private CacheUtil cacheUtil; /** @@ -54,7 +54,7 @@ public class RabbitMQConsumerUtil { * -----------------------------------以下为异步业务操作---------------------------- */ String carList = (String) redisService.redisTemplate.opsForValue().get("carList"); - cacheUtil.put("carList",carList); + /** diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java index 9050496..3d9a148 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java @@ -47,107 +47,107 @@ public class RabbitMQProducerUtil { } - /** - * Work queue 工作模型 - * - * @param obj 传递的消息 (如果是对象需要序列化) - * @return 结果集 - * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点 - */ - public Result workSendMessage(String queueName, Object obj, String msg) { - - log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg); - // 发送简单模型消息 - // 第一个参数: 绑定规则 相当于 队列名称 - // 第二个参数:消息内容 - rabbitTemplate.convertAndSend(queueName, obj, message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - } ); - - log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName); - - return Result.success("消息发送成功"); - } - - /** - * Publish/Subscribe 发布订阅者模型 - * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout - * - * @param exchange 交换机名称 - * @param obj 发送的消息Object - * @param msg 响应的内容 - * @return 结果集 - */ - public Result publishSubscribeSendMessage(String exchange, Object obj, String msg) { - - log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg); - // 发送简单模型消息 - // 第一个参数: exchange 交换机的名称 - // 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则 - // 第三个参数:消息内容 - rabbitTemplate.convertAndSend(exchange, "", obj, message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - } ); - - log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange); - - return Result.success("消息发送成功"); - } - - /** - * Routing路由模型 - * 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式 - * - * @param exchange 交换机名称 - * @param rule 绑定规则 一个字符串即可 - * @param obj 发送的消息Object - * @param msg 响应的内容 - * @return 结果集 - */ - public Result routingSendMessage(String exchange, String rule, Object obj, String msg) { - - log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg); - // 发送简单模型消息 - // 第一个参数: 绑定规则 相当于 队列名称 - // 第二个参数:消息内容 - rabbitTemplate.convertAndSend(exchange, rule, obj, message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - } ); - - log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange); - - return Result.success("消息发送成功"); - } - - - /** - * Topic主题模型模型 - * 使用的是 topic 类型的交换机 - * - * @param exchange 交换机名称 - * @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# ) - * @param obj 发送的消息Object - * @param msg 响应的内容 - * @return 结果集 - */ - public Result topicSendMessage(String exchange, String rule, Object obj) { - - log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj); - // 发送简单模型消息 - // 第一个参数: 绑定规则 相当于 队列名称 - // 第二个参数:消息内容 - rabbitTemplate.convertAndSend(exchange, rule, obj, message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - } ); - - log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange); - - return Result.success(obj,"消息发送成功"); - } +// /** +// * Work queue 工作模型 +// * +// * @param obj 传递的消息 (如果是对象需要序列化) +// * @return 结果集 +// * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点 +// */ +// public Result workSendMessage(String queueName, Object obj, String msg) { +// +// log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg); +// // 发送简单模型消息 +// // 第一个参数: 绑定规则 相当于 队列名称 +// // 第二个参数:消息内容 +// rabbitTemplate.convertAndSend(queueName, obj, message -> { +// message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); +// return message; +// } ); +// +// log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName); +// +// return Result.success("消息发送成功"); +// } +// +// /** +// * Publish/Subscribe 发布订阅者模型 +// * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout +// * +// * @param exchange 交换机名称 +// * @param obj 发送的消息Object +// * @param msg 响应的内容 +// * @return 结果集 +// */ +// public Result publishSubscribeSendMessage(String exchange, Object obj, String msg) { +// +// log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg); +// // 发送简单模型消息 +// // 第一个参数: exchange 交换机的名称 +// // 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则 +// // 第三个参数:消息内容 +// rabbitTemplate.convertAndSend(exchange, "", obj, message -> { +// message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); +// return message; +// } ); +// +// log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange); +// +// return Result.success("消息发送成功"); +// } +// +// /** +// * Routing路由模型 +// * 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式 +// * +// * @param exchange 交换机名称 +// * @param rule 绑定规则 一个字符串即可 +// * @param obj 发送的消息Object +// * @param msg 响应的内容 +// * @return 结果集 +// */ +// public Result routingSendMessage(String exchange, String rule, Object obj, String msg) { +// +// log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg); +// // 发送简单模型消息 +// // 第一个参数: 绑定规则 相当于 队列名称 +// // 第二个参数:消息内容 +// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> { +// message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); +// return message; +// } ); +// +// log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange); +// +// return Result.success("消息发送成功"); +// } +// +// +// /** +// * Topic主题模型模型 +// * 使用的是 topic 类型的交换机 +// * +// * @param exchange 交换机名称 +// * @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# ) +// * @param obj 发送的消息Object +// * @param msg 响应的内容 +// * @return 结果集 +// */ +// public Result topicSendMessage(String exchange, String rule, Object obj) { +// +// log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj); +// // 发送简单模型消息 +// // 第一个参数: 绑定规则 相当于 队列名称 +// // 第二个参数:消息内容 +// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> { +// message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); +// return message; +// } ); +// +// log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange); +// +// return Result.success(obj,"消息发送成功"); +// } /** diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/util/CacheUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/util/CacheUtil.java deleted file mode 100644 index 16ed054..0000000 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/util/CacheUtil.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.muyu.rabbitmq.util; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import org.springframework.stereotype.Component; - -/** - * 缓存工具类 - * - * @program: cloud-server - * @author: 刘武 - * @create: 2024-09-30 10:08 - **/ -@Component -public class CacheUtil { - - private final Cache cache; - - public CacheUtil() { - this.cache = Caffeine.newBuilder() - .maximumSize(500L) - .build(); - } - - public T get(String key) { - return cache.getIfPresent(key); - } - - public void put(String key, T value) { - cache.put(key, value); - } - - public void remove(String key) { - cache.invalidate(key); - } - -} diff --git a/cloud-modules/cloud-event/pom.xml b/cloud-modules/cloud-event/pom.xml index cae475e..d9321b3 100644 --- a/cloud-modules/cloud-event/pom.xml +++ b/cloud-modules/cloud-event/pom.xml @@ -115,6 +115,12 @@ cloud-common-rabbit + + com.muyu + saas-cache + 3.6.3 + + diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java index 624f7fa..f724836 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java @@ -28,6 +28,4 @@ public class EventPublisher implements ApplicationEventPublisherAware { publisher.publishEvent(event); } - - } diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java index e8aeb7d..4f10b09 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java @@ -1,9 +1,24 @@ package com.muyu.event.consumer; +import com.alibaba.fastjson2.JSONObject; +import com.muyu.cache.ElectronicFenceGroupCacheService; +import com.muyu.cache.SysCarCacheService; +import com.muyu.common.domain.database.ElectronicFenceGroup; +import com.muyu.common.domain.resp.SysCarVo; +import com.muyu.common.redis.service.RedisService; import com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil; +import com.rabbitmq.client.Channel; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.io.IOException; +import java.util.Date; +import java.util.List; + /** * rabbitmq 监听器 * @author 刘武 @@ -13,9 +28,65 @@ import org.springframework.stereotype.Component; */ @Component +@Log4j2 public class MqConsumer { + @Autowired + private RedisService redisService; + @Autowired + private SysCarCacheService sysCarCacheService; + @Autowired + private ElectronicFenceGroupCacheService electronicFenceGroupCacheService; + + @RabbitListener(queuesToDeclare = @Queue(name = "basic")) + public void rabbitMQBasicConsumer(String data , Message message , Channel channel) { + log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message); + try { + // 获取到消息 开始消费 + log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data)); + + Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId()); + + if (add != 1) { + return; + } + + /** + * -----------------------------------以下为异步业务操作---------------------------- + */ + List carList = sysCarCacheService.get("carList"); + ElectronicFenceGroup fenceGroupList = electronicFenceGroupCacheService.get("electronicFenceGroupList"); + + + /** + * ------------------------------------------------------------------------------ + */ + // 消费消息成功之后需要确认 + // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息 + // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("xxx消费者接收到消息,消息内容:{},消费成功...", message); + + } catch (Exception e) { + log.error("xxx消费者接收到消息,消息内容:{},消费消息异常,异常信息:{}", message, e); + // 消息回退 拒绝消费消息 + // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息 + // boolean requeue 是否回到原来的队列 + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); +// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); + } catch (IOException ex) { + log.error("xxx消费者接收到消息,消息内容:{},回退消息异常,异常信息:{}", message, ex); + } + }finally { + try { + channel.close(); + } catch (Exception e) { + log.error("xxx消费者关闭Channel异常,消息内容:{},异常信息:{}", message, e); + } + } + } } diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java index 3603367..d02d4b7 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java @@ -10,6 +10,7 @@ import org.springframework.web.bind.annotation.RestController; * @name:DataController * @date:2024/9/29 20:16 */ + @RestController @RequestMapping("data") public class DataController { diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java index 7bbfed5..d73514c 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java @@ -57,18 +57,4 @@ public class IoTDBController { return Result.success("添加成功"); }; - - - - - - - - - - - - - - } diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java index e3fc4a7..bfbfd04 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java @@ -28,6 +28,7 @@ public class AddDatabaseListener implements EventListener { keys.add(key); values.add((String) value); }); + } @Override diff --git a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java index 89dc512..4b4d6c9 100644 --- a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java +++ b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java @@ -33,7 +33,6 @@ public class TemplateController { @Autowired private TemplateService templateService; - /** * 报文模版列表 * @return @@ -58,7 +57,7 @@ public class TemplateController { } /** - * 报文模版添加 + * 报文模版添加0002222220 * @param template * @return */