From f46db057b5c30c68399f6d51b3a8c43d8e51e8ba Mon Sep 17 00:00:00 2001
From: Liu Wu <2780205363@qq.com>
Date: Mon, 30 Sep 2024 16:34:37 +0800
Subject: [PATCH] =?UTF-8?q?fix():=20=E4=BA=8B=E4=BB=B6=E5=9F=BA=E7=A1=80?=
=?UTF-8?q?=E4=BF=AE=E6=94=B9,kafka=E6=B3=A8=E5=85=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
cloud-common/cloud-common-rabbit/pom.xml | 8 +-
.../rabbit/RabbitListenerConfigurer.java | 41 -----
.../rabbit/config/DelayedQueueConfig.java | 83 +++++++++
.../rabbit/config/MyConfirmCallback.java | 47 +++++
.../rabbit/config/RabbitAdminConfig.java | 49 +++++
.../common/rabbit/config/RabbitmqConfig.java | 20 ++
.../rabbit/config/ReturnCallbackConfig.java | 37 ++++
.../common/rabbit/config/TopicConfig.java | 75 ++++++++
.../rabbit/constants/RabbitmqConstants.java | 35 ++++
.../rabbit/consumer/RabbitMQConsumerUtil.java | 140 ++++++++++++++
.../rabbit/producer/RabbitMQProducerUtil.java | 174 ++++++++++++++++++
...ot.autoconfigure.AutoConfiguration.imports | 8 +-
cloud-modules/cloud-event/pom.xml | 16 +-
.../muyu/event/consumer/MessageConsumer.java | 14 +-
.../muyu/event/consumer/OnlineConsumer.java | 21 +++
.../muyu/event/controller/TestController.java | 14 +-
.../java/com/muyu/event/util/CacheUtil.java | 37 ++++
.../src/main/resources/bootstrap.yml | 7 +-
18 files changed, 757 insertions(+), 69 deletions(-)
delete mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/DelayedQueueConfig.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/MyConfirmCallback.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java
create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/OnlineConsumer.java
create mode 100644 cloud-modules/cloud-event/src/main/java/com/muyu/event/util/CacheUtil.java
diff --git a/cloud-common/cloud-common-rabbit/pom.xml b/cloud-common/cloud-common-rabbit/pom.xml
index fa6d383..79f5225 100644
--- a/cloud-common/cloud-common-rabbit/pom.xml
+++ b/cloud-common/cloud-common-rabbit/pom.xml
@@ -17,6 +17,10 @@
UTF-8
+
+ cloud-common-rabbit 消息队列服务
+
+
@@ -28,8 +32,8 @@
com.muyu
- cloud-common-core
+ cloud-common-redis
-
\ No newline at end of file
+
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java
deleted file mode 100644
index 51cb359..0000000
--- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.muyu.common.rabbit;
-
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.messaging.converter.MappingJackson2MessageConverter;
-import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
-
-@Configuration
-public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer {
-
- static {
- System.setProperty("spring.amqp.deserialization.trust.all", "true");
- }
-
- //以下配置RabbitMQ消息服务
- @Autowired
- public ConnectionFactory connectionFactory;
-
-
- /**
- * 处理器方法工厂
- * @return
- */
- @Bean
- public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
- DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
- // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
- factory.setMessageConverter(new MappingJackson2MessageConverter());
- return factory;
- }
-
- @Override
- public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
- rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
- }
-
-}
-
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/DelayedQueueConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/DelayedQueueConfig.java
new file mode 100644
index 0000000..0b67d49
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/DelayedQueueConfig.java
@@ -0,0 +1,83 @@
+package com.muyu.common.rabbit.config;
+
+
+import com.muyu.common.rabbit.constants.RabbitmqConstants;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+
+/**
+ * @ClassName: DelayedQueueConfig
+ * @Description: 延迟队列配置类
+ */
+@Configuration
+public class DelayedQueueConfig {
+
+
+ @Resource
+ private RabbitAdmin rabbitAdmin;
+
+ /**
+ * 声明队列
+ * @return 返回队列
+ */
+ @Bean
+ public Queue delayedQueue() {
+ Queue queue = new Queue(RabbitmqConstants.DELAYED_QUEUE_NAME);
+ rabbitAdmin.declareQueue(queue);
+ return queue;
+ }
+
+ /**
+ * 声明交换机
+ * @return 返回交换机
+ */
+ @Bean
+ public Exchange delayedExchange() {
+ HashMap arguments = new HashMap<>(3);
+
+ arguments.put("x-delayed-type", "direct");
+
+ /**
+ * 声明自定义交换机
+ * 第一个参数:交换机的名称
+ * 第二个参数:交换机的类型
+ * 第三个参数:是否需要持久化
+ * 第四个参数:是否自动删除
+ * 第五个参数:其他参数
+ */
+ CustomExchange customExchange = new CustomExchange(
+ RabbitmqConstants.DELAYED_EXCHANGE_NAME,
+ "x-delayed-message",
+ true,
+ false,
+ arguments);
+ rabbitAdmin.declareExchange(customExchange);
+ return customExchange;
+ }
+
+ /**
+ * 绑定交换机
+ * @param delayedQueue 队列对象
+ * @param delayedExchange 交换机对象
+ */
+ @Bean
+ public Binding delayedQueueBindingDelayedExchange(
+ @Qualifier("delayedQueue") Queue delayedQueue,
+ @Qualifier("delayedExchange") Exchange delayedExchange) {
+
+ Binding noargs = BindingBuilder.bind(delayedQueue)
+ .to(delayedExchange)
+ .with(RabbitmqConstants.DELAYED_ROUTING_KEY)
+ .noargs();
+ rabbitAdmin.declareBinding(noargs);
+ return noargs;
+ }
+
+
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/MyConfirmCallback.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/MyConfirmCallback.java
new file mode 100644
index 0000000..2b40812
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/MyConfirmCallback.java
@@ -0,0 +1,47 @@
+package com.muyu.common.rabbit.config;
+
+import lombok.AllArgsConstructor;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @ClassName:
+ * @Description: 消息发送到 交换机的确认 回调方法
+ */
+@Component
+@AllArgsConstructor
+public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
+
+
+ private RabbitTemplate rabbitTemplate;
+
+// public MyConfirmCallback(RabbitTemplate rabbitTemplate) {
+// this.rabbitTemplate = rabbitTemplate;
+// // 设置 消息发送到交换机成功 的回调
+// this.rabbitTemplate.setConfirmCallback(this);
+// }
+
+ @PostConstruct
+ public void init() {
+ this.rabbitTemplate.setConfirmCallback(this);
+ }
+
+ /**
+ * 发送消息到交换机的回调方法 消息发送成功或者失败都会执行
+ *
+ * @param correlationData correlation data for the callback. 消息的元数据
+ * @param ack true for ack, false for nack
+ * @param cause An optional cause, for nack, when available, otherwise null.
+ */
+ @Override
+ public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+ if (ack) {
+ System.out.println("消息发送到交换机成功~");
+ } else {
+ System.out.println("消息发送到交换机失败,失败的原因:" + cause);
+ }
+ }
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java
new file mode 100644
index 0000000..27b24c5
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java
@@ -0,0 +1,49 @@
+package com.muyu.common.rabbit.config;
+
+
+
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @ClassName: RabbitAdminConfig
+ * @Description: RabbitAdmin配置类
+ */
+@Configuration
+public class RabbitAdminConfig {
+ @Value("${spring.rabbitmq.host}")
+ private String host;
+ @Value("${spring.rabbitmq.username}")
+ private String username;
+ @Value("${spring.rabbitmq.password}")
+ private String password;
+ @Value("${spring.rabbitmq.virtualhost}")
+ private String virtualHost;
+
+ @Bean
+ public ConnectionFactory connectionFactory() {
+ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
+ cachingConnectionFactory.setHost(host);
+ cachingConnectionFactory.setUsername(username);
+ cachingConnectionFactory.setPassword(password);
+ cachingConnectionFactory.setVirtualHost(virtualHost);
+ return cachingConnectionFactory;
+
+ }
+
+ @Bean
+ public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
+ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
+ rabbitAdmin.setAutoStartup(true);
+ return rabbitAdmin;
+ }
+
+}
+
+
+
+
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java
new file mode 100644
index 0000000..9814d1b
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java
@@ -0,0 +1,20 @@
+package com.muyu.common.rabbit.config;
+
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * JSON 消息转换器 自动将发送的消息转换成 json 字符串 并且 消费者接收到消息的时候自动反序列化 成需要的对象
+ */
+@Configuration
+public class RabbitmqConfig {
+
+
+ // 消息转换配置
+ @Bean
+ public MessageConverter jsonMessageConverter() {
+ return new Jackson2JsonMessageConverter();
+ }
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java
new file mode 100644
index 0000000..212e2fd
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java
@@ -0,0 +1,37 @@
+package com.muyu.common.rabbit.config;
+
+import lombok.AllArgsConstructor;
+import org.springframework.amqp.core.ReturnedMessage;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * 消息发送到 队列的确认
+ */
+@Component
+@AllArgsConstructor
+public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
+
+
+ private final RabbitTemplate rabbitTemplate;
+
+ @PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
+ public void init() {
+ rabbitTemplate.setReturnsCallback(this);
+ }
+
+ /**
+ * 消息发送到 队列失败的时候执行
+ *
+ * @param returnedMessage the returned message and metadata.
+ */
+ @Override
+ public void returnedMessage(ReturnedMessage returnedMessage) {
+ System.out.println("消息" + returnedMessage.getMessage().toString() +
+ "被交换机" + returnedMessage.getExchange() + "回退!"
+ + "退回原因为:" + returnedMessage.getReplyText());
+ // 回退了所有的信息,可做补偿机制 记录发送的日志
+ }
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java
new file mode 100644
index 0000000..58d717b
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/TopicConfig.java
@@ -0,0 +1,75 @@
+package com.muyu.common.rabbit.config;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @Author: WangXin
+ * @Time: 2024/4/22 11:55
+ * @Description: 主题模式配置
+ */
+@Configuration
+public class TopicConfig {
+
+ /**
+ * 主题模式交换机
+ * @return exchange
+ */
+ @Bean(name = "topicExchange")
+ public Exchange getTopicExchange(){
+ return ExchangeBuilder
+ .topicExchange("exchange_topic")
+ .build();
+ }
+
+ /**
+ * 主题队列 01
+ * @return queue
+ */
+ @Bean(name = "topicQueue01")
+ public Queue getTopicQueue01(){
+ return QueueBuilder
+ .durable("queue_topic_01")
+ .build();
+ }
+
+ /**
+ * 主题队列 02
+ * @return queue
+ */
+ @Bean(name = "topicQueue02")
+ public Queue getTopicQueue02(){
+ return QueueBuilder
+ .durable("queue_topic_02")
+ .build();
+ }
+
+ /**
+ * 绑定队列 01
+ * @return binding
+ */
+ @Bean
+ public Binding getTopicBinding01(){
+ return BindingBuilder
+ .bind(getTopicQueue01())
+ .to(getTopicExchange())
+ //路由键 队列1接收debug级别的消息
+ .with("front.#")
+ .noargs();
+ }
+
+ /**
+ * 绑定队列 02
+ * @return binding
+ */
+ @Bean
+ public Binding getTopicBinding02(){
+ return BindingBuilder
+ .bind(getTopicQueue02())
+ .to(getTopicExchange())
+ // 路由键 队列2接收info级别的消息
+ .with("back.order.*")
+ .noargs();
+ }
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java
new file mode 100644
index 0000000..45495ab
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitmqConstants.java
@@ -0,0 +1,35 @@
+package com.muyu.common.rabbit.constants;
+
+/**
+ *
+ * @author: 刘武
+ * @date: 2024/7/10
+ * @Description: rabbitmq常量
+ * @Version 1.0.0
+ */
+public interface RabbitmqConstants {
+
+ //普通队列
+ String BASIC_QUEUE_NAME = "BASIC_QUEUE_NAME";
+
+ String lOG_QUEUE_NAME = "LOG_QUEUE_NAME";
+ //延迟队列
+ //队列名称
+ String DELAYED_QUEUE_NAME = "delayed_queue";
+ //交换机名称
+ String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";
+ //交换机
+ String DELAYED_ROUTING_KEY = "delayed";
+ /**
+ * 上下线监听交换机
+ */
+ String TOP_BOTTOM_STITCHING = "top_bottom_stitching";
+ /**
+ * 上线规则
+ */
+ String TOP_RULE = "car.top.data";
+ /**
+ * 车辆下线规则
+ */
+ String BOTTOM_RULE = "car.bottom.data";
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java
new file mode 100644
index 0000000..2181562
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/consumer/RabbitMQConsumerUtil.java
@@ -0,0 +1,140 @@
+package com.muyu.common.rabbit.consumer;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.muyu.common.redis.service.RedisService;
+import com.rabbitmq.client.Channel;
+import lombok.AllArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.amqp.core.Message;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * @ClassName: RabbitMQConsumerUtil
+ * @Description: rabbitmq消费者
+ */
+@Component
+@Log4j2
+@AllArgsConstructor
+public class RabbitMQConsumerUtil {
+
+ private final RedisService redisService;
+
+
+ /**
+ * 普通消费者
+ * @param data 数据类型
+ * @param message
+ * @param channel
+ */
+ public void rabbitMQBasicConsumer(Object data ,Message message , Channel channel) {
+ log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message);
+ try {
+ // 获取到消息 开始消费
+ log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
+
+
+ Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
+
+ if (add != 1) {
+ return;
+ }
+
+
+ /**
+ * -----------------------------------以下为异步业务操作----------------------------
+ */
+
+ /**
+ * ------------------------------------------------------------------------------
+ */
+ // 消费消息成功之后需要确认
+ // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
+ // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+ log.info("xxx消费者接收到消息,消息内容:{},消费成功...", message);
+
+ } catch (Exception e) {
+ log.error("xxx消费者接收到消息,消息内容:{},消费消息异常,异常信息:{}", message, e);
+ // 消息回退 拒绝消费消息
+ // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
+ // boolean requeue 是否回到原来的队列
+ try {
+ channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
+// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+ } catch (IOException ex) {
+ log.error("xxx消费者接收到消息,消息内容:{},回退消息异常,异常信息:{}", message, ex);
+ }
+ }finally {
+ try {
+ channel.close();
+ } catch (Exception e) {
+ log.error("xxx消费者关闭Channel异常,消息内容:{},异常信息:{}", message, e);
+ }
+ }
+ }
+
+
+ /**
+ * 普通消费者
+ * @param data 数据类型
+ * @param message
+ * @param channel
+ */
+ public void carUpConsumer(String data,Message message , Channel channel) {
+ log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message);
+ try {
+ // 获取到消息 开始消费
+ log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
+
+
+ Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
+
+ if (add != 1) {
+ return;
+ }
+
+
+ /**
+ * -----------------------------------以下为异步业务操作----------------------------
+ */
+ log.info("[ 根据vin拿到缓存 ] vin为 --》 {}",data);
+ log.info("[ 存入本地缓存 ] 数据为 --》 {}",data);
+ log.info("[ 存入本地缓存 ] 数据为 --》 {}",data);
+ /**
+ * ------------------------------------------------------------------------------
+ */
+ // 消费消息成功之后需要确认
+ // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
+ // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+ log.info("xxx消费者接收到消息,消息内容:{},消费成功...", message);
+
+ } catch (Exception e) {
+ log.error("xxx消费者接收到消息,消息内容:{},消费消息异常,异常信息:{}", message, e);
+ // 消息回退 拒绝消费消息
+ // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
+ // boolean requeue 是否回到原来的队列
+ try {
+ channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
+// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+ } catch (IOException ex) {
+ log.error("xxx消费者接收到消息,消息内容:{},回退消息异常,异常信息:{}", message, ex);
+ }
+ }finally {
+ try {
+ channel.close();
+ } catch (Exception e) {
+ log.error("xxx消费者关闭Channel异常,消息内容:{},异常信息:{}", message, e);
+ }
+ }
+ }
+
+
+
+
+
+
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java
new file mode 100644
index 0000000..fc7c3b8
--- /dev/null
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java
@@ -0,0 +1,174 @@
+package com.muyu.common.rabbit.producer;
+
+import com.muyu.common.core.domain.Result;
+import com.muyu.common.rabbit.constants.RabbitmqConstants;
+import lombok.AllArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+/**
+ * @ClassName: RabbitMQProducer
+ * @Description: rabbitmq生产者
+ */
+@Component
+@AllArgsConstructor
+@Log4j2
+public class RabbitMQProducerUtil {
+ //redis工具类对象
+
+ //rabbit
+ private final RabbitTemplate rabbitTemplate;
+
+
+ /**
+ * 简单模型
+ *
+ * @param param 传递的消息 (如果是对象需要序列化)
+ * @return 结果集
+ * 一对一消费,只有一个消费者能接收到
+ */
+ public Result> basicSendMessage(String queueName, Object param, String msg) {
+
+ log.info("【简单模型mq】 : method: 【 basicSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", RabbitmqConstants.BASIC_QUEUE_NAME, param, msg);
+ // 发送简单模型消息
+ // 第一个参数: 绑定规则 相当于 队列名称
+ // 第二个参数:消息内容
+ rabbitTemplate.convertAndSend(queueName, param, message -> {
+ message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+ return message;
+ } );
+
+ log.info("【简单模型mq】 : method: 【 basicSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", RabbitmqConstants.BASIC_QUEUE_NAME);
+
+ return Result.success(msg!=null?msg:"消息发送成功");
+ }
+
+ /**
+ * Work queue 工作模型
+ *
+ * @param obj 传递的消息 (如果是对象需要序列化)
+ * @return 结果集
+ * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点
+ */
+ public Result> workSendMessage(String queueName, Object obj, String msg) {
+
+ log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
+ // 发送简单模型消息
+ // 第一个参数: 绑定规则 相当于 队列名称
+ // 第二个参数:消息内容
+ rabbitTemplate.convertAndSend(queueName, obj, message -> {
+ message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+ return message;
+ } );
+
+ log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
+
+ return Result.success("消息发送成功");
+ }
+
+ /**
+ * Publish/Subscribe 发布订阅者模型
+ * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
+ *
+ * @param exchange 交换机名称
+ * @param obj 发送的消息Object
+ * @param msg 响应的内容
+ * @return 结果集
+ */
+ public Result> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
+
+ log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
+ // 发送简单模型消息
+ // 第一个参数: exchange 交换机的名称
+ // 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
+ // 第三个参数:消息内容
+ rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
+ message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+ return message;
+ } );
+
+ log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
+
+ return Result.success("消息发送成功");
+ }
+
+ /**
+ * Routing路由模型
+ * 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式
+ *
+ * @param exchange 交换机名称
+ * @param rule 绑定规则 一个字符串即可
+ * @param obj 发送的消息Object
+ * @param msg 响应的内容
+ * @return 结果集
+ */
+ public Result> routingSendMessage(String exchange, String rule, Object obj, String msg) {
+
+ log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
+ // 发送简单模型消息
+ // 第一个参数: 绑定规则 相当于 队列名称
+ // 第二个参数:消息内容
+ rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
+ message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+ return message;
+ } );
+
+ log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
+
+ return Result.success("消息发送成功");
+ }
+
+
+ /**
+ * Topic主题模型模型
+ * 使用的是 topic 类型的交换机
+ *
+ * @param exchange 交换机名称
+ * @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# )
+ * @param obj 发送的消息Object
+ * @param msg 响应的内容
+ * @return 结果集
+ */
+ public Result> topicSendMessage(String exchange, String rule, Object obj) {
+
+ log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
+ // 发送简单模型消息
+ // 第一个参数: 绑定规则 相当于 队列名称
+ // 第二个参数:消息内容
+ rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
+ message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+ return message;
+ } );
+
+ log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
+
+ return Result.success(obj,"消息发送成功");
+ }
+
+
+ /**
+ * 延迟队列模型
+ * @param param 传输内容
+ * @param delayTime 延迟时间
+ * @return 结果集
+ */
+ public Result> delayedSendMessage(Long delayTime, Object param) {
+ log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送中。。。 】",param);
+
+ rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYED_EXCHANGE_NAME, RabbitmqConstants.DELAYED_ROUTING_KEY,param, message -> {
+ MessageProperties messageProperties = message.getMessageProperties();
+ messageProperties.setMessageId(UUID.randomUUID().toString());
+ messageProperties.setDelayLong(delayTime);
+ return message;
+ });
+ log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送成功 】",param);
+
+ return Result.success(param,"消息发送成功");
+
+ }
+
+}
diff --git a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index 189ea2c..6cd925a 100644
--- a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1 +1,7 @@
-com.muyu.common.rabbit.RabbitListenerConfigurer
\ No newline at end of file
+com.muyu.rabbitmq.producer.RabbitMQProducerUtil
+com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil
+com.muyu.rabbitmq.config.RabbitmqConfig
+com.muyu.rabbitmq.config.MyConfirmCallback
+com.muyu.rabbitmq.config.DelayedQueueConfig
+com.muyu.rabbitmq.config.RabbitAdminConfig
+com.muyu.rabbitmq.config.ReturnCallbackConfig
diff --git a/cloud-modules/cloud-event/pom.xml b/cloud-modules/cloud-event/pom.xml
index 124e715..cae475e 100644
--- a/cloud-modules/cloud-event/pom.xml
+++ b/cloud-modules/cloud-event/pom.xml
@@ -54,13 +54,6 @@
spring-boot-starter-actuator
-
-
- org.springframework.kafka
- spring-kafka
-
-
-
com.mysql
@@ -112,15 +105,14 @@
- org.apache.kafka
- kafka-clients
- 2.8.0
+ com.muyu
+ cloud-common-kafka
+ 3.6.3
com.muyu
- cloud-common-kafka
- 3.6.3
+ cloud-common-rabbit
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java
index 4f78908..3cb39c0 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java
@@ -4,6 +4,7 @@ package com.muyu.event.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.basic.EventPublisher;
+import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
@@ -25,6 +26,7 @@ import static org.bouncycastle.asn1.x500.style.RFC4519Style.l;
* @date:2024/9/28 23:34
*/
@Component
+@Log4j2
public class MessageConsumer implements ApplicationRunner {
@Autowired
@@ -32,7 +34,7 @@ public class MessageConsumer implements ApplicationRunner {
@Autowired
private EventPublisher eventPublisher;
- private final String topic="vehicle";
+ private final String topic="four_car";
@Override
@@ -44,13 +46,9 @@ public class MessageConsumer implements ApplicationRunner {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> {
String value = record.value();
- System.out.println(value);
-
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("123","123");
-
- //事件处理
- eventPublisher.publishEvent(jsonObject);
+ JSONObject jsonObject = JSONObject.parseObject(value);
+ log.info("value:{}",value);
+// eventPublisher.publishEvent(jsonObject);
});
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/OnlineConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/OnlineConsumer.java
new file mode 100644
index 0000000..d8871b7
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/OnlineConsumer.java
@@ -0,0 +1,21 @@
+package com.muyu.event.consumer;
+
+import lombok.extern.log4j.Log4j2;
+import org.springframework.stereotype.Component;
+
+/**
+ * 车辆上线监听
+ * @author 刘武
+ * @package:com.muyu.event.consumer
+ * @name:OnlineConsumer
+ * @date:2024/9/30 11:40
+ */
+@Component
+@Log4j2
+public class OnlineConsumer {
+
+
+
+
+
+}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/TestController.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/TestController.java
index 84c9f93..fb46a2e 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/TestController.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/TestController.java
@@ -5,12 +5,15 @@ import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.service.TestService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
+import java.util.Date;
/**
* @author 刘武
@@ -25,13 +28,18 @@ public class TestController {
private KafkaProducer kafkaProducer;
- private static final String topic="vehicle";
+
+ private static final String topic="four_car";
@GetMapping("send")
public String sendKafka(){
String message="发送一条信息";
- ProducerRecord producerRecord = new ProducerRecord(topic,message);
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("cj","sb");
+
+
+ ProducerRecord producerRecord = new ProducerRecord(topic,jsonObject.toJSONString());
kafkaProducer.send(producerRecord);
return "success";
@@ -50,4 +58,6 @@ public class TestController {
+
+
}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/CacheUtil.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/CacheUtil.java
new file mode 100644
index 0000000..c2de42f
--- /dev/null
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/util/CacheUtil.java
@@ -0,0 +1,37 @@
+package com.muyu.event.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.springframework.stereotype.Component;
+
+/**
+ * 缓存工具类
+ *
+ * @program: cloud-server
+ * @author: 刘武
+ * @create: 2024-09-30 10:08
+ **/
+@Component
+public class CacheUtil {
+
+ private final Cache cache;
+
+ public CacheUtil() {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(500L)
+ .build();
+ }
+
+ public T get(String key) {
+ return cache.getIfPresent(key);
+ }
+
+ public void put(String key, T value) {
+ cache.put(key, value);
+ }
+
+ public void remove(String key) {
+ cache.invalidate(key);
+ }
+
+}
diff --git a/cloud-modules/cloud-event/src/main/resources/bootstrap.yml b/cloud-modules/cloud-event/src/main/resources/bootstrap.yml
index f0ff80f..987a973 100644
--- a/cloud-modules/cloud-event/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-event/src/main/resources/bootstrap.yml
@@ -1,6 +1,7 @@
# Tomcat
server:
port: 10009
+
# nacos线上地址
nacos:
addr: 47.101.53.251:8848
@@ -9,6 +10,7 @@ nacos:
namespace: four
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
+spring:
amqp:
deserialization:
trust:
@@ -51,9 +53,8 @@ nacos:
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# xxl-job 配置文件
- application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
- # rabbit 配置文件
- - application-rabbit-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
+
logging:
level:
- com.muyu.fence.mapper: DEBUG
+ com.muyu.system.mapper: DEBUG