From c638327a2f5cc2ece5cb04345dc6de49fa8d27ea Mon Sep 17 00:00:00 2001 From: fst1996 <2411194573@qq.com> Date: Fri, 1 Dec 2023 15:36:11 +0800 Subject: [PATCH] =?UTF-8?q?mq=E9=80=9A=E7=9F=A5=E4=B8=BB=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- car-base-common/pom.xml | 6 ++- .../rabbitmq/ConfirmCallbackConfig.java | 48 +++++++++++++++++++ .../config/rabbitmq/ReturnCallbackConfig.java | 41 ++++++++++++++++ .../base/server/service/TopLineService.java | 1 - .../service/impl/TopLineServiceImpl.java | 13 +++++ 5 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 car-base-common/src/main/java/com/god/base/config/rabbitmq/ConfirmCallbackConfig.java create mode 100644 car-base-common/src/main/java/com/god/base/config/rabbitmq/ReturnCallbackConfig.java diff --git a/car-base-common/pom.xml b/car-base-common/pom.xml index a950cd0..ca9d62d 100644 --- a/car-base-common/pom.xml +++ b/car-base-common/pom.xml @@ -18,7 +18,11 @@ - + + + org.springframework.boot + spring-boot-starter-amqp + diff --git a/car-base-common/src/main/java/com/god/base/config/rabbitmq/ConfirmCallbackConfig.java b/car-base-common/src/main/java/com/god/base/config/rabbitmq/ConfirmCallbackConfig.java new file mode 100644 index 0000000..4dd12fb --- /dev/null +++ b/car-base-common/src/main/java/com/god/base/config/rabbitmq/ConfirmCallbackConfig.java @@ -0,0 +1,48 @@ +package com.god.base.config.rabbitmq; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送确认配置 + * 消息发送到交换机的回调 + */ +@Component +@Log4j2 +public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setConfirmCallback(this); + } + + /** + * 交换机不管是否收到消息的一个回调方法 + * + * @param correlationData 消息相关数据 + * @param ack 交换机是否收到消息 + * @param cause 失败原因 + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (!ack) { + String exchange = correlationData.getReturned().getExchange(); + String message = correlationData.getReturned().getMessage().getBody().toString(); + // 发送异常 + log.error("消息:{},发送到交换机:{}失败,原因是:{}", message, exchange, cause); + // TODO 可以把异常信息 以及 消息的内容直接添加到 MYSQL + } + } + +} diff --git a/car-base-common/src/main/java/com/god/base/config/rabbitmq/ReturnCallbackConfig.java b/car-base-common/src/main/java/com/god/base/config/rabbitmq/ReturnCallbackConfig.java new file mode 100644 index 0000000..15546e1 --- /dev/null +++ b/car-base-common/src/main/java/com/god/base/config/rabbitmq/ReturnCallbackConfig.java @@ -0,0 +1,41 @@ +package com.god.base.config.rabbitmq; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送到队列的确认 + */ +@Component +@Log4j2 +public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送失败 则会执行这个方法 + * + * @param returnedMessage the returned message and metadata. + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.error("消息:{},被交换机:{} 回退!退回原因为:{}", + returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText()); + // TODO 回退了所有的信息,可做补偿机制 + } + +} diff --git a/car-base-server/src/main/java/com/god/base/server/service/TopLineService.java b/car-base-server/src/main/java/com/god/base/server/service/TopLineService.java index eec6c3f..c2f5c29 100644 --- a/car-base-server/src/main/java/com/god/base/server/service/TopLineService.java +++ b/car-base-server/src/main/java/com/god/base/server/service/TopLineService.java @@ -1,7 +1,6 @@ package com.god.base.server.service; import com.baomidou.mybatisplus.extension.service.IService; -import com.god.base.domain.Car; import com.god.base.domain.TopicCar; import com.god.base.domain.request.GetTopicReq; diff --git a/car-base-server/src/main/java/com/god/base/server/service/impl/TopLineServiceImpl.java b/car-base-server/src/main/java/com/god/base/server/service/impl/TopLineServiceImpl.java index e4db0d8..0dde999 100644 --- a/car-base-server/src/main/java/com/god/base/server/service/impl/TopLineServiceImpl.java +++ b/car-base-server/src/main/java/com/god/base/server/service/impl/TopLineServiceImpl.java @@ -10,7 +10,9 @@ import com.god.base.server.mapper.TopicCarMapper; import com.god.base.server.service.TopLineService; import com.god.common.core.exception.ServiceException; import com.god.common.core.utils.StringUtils; +import com.god.common.core.utils.uuid.UUID; import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -28,6 +30,9 @@ public class TopLineServiceImpl extends ServiceImpl i @Autowired private TopicCarMapper carMapper; + @Autowired + private RabbitTemplate rabbitTemplate; + /** * 车辆上线,同时返回主题 * @param getTopicReq @@ -47,6 +52,14 @@ public class TopLineServiceImpl extends ServiceImpl i //把车辆状态修改为上线 1 topicCar.setStatus(1); carMapper.updateById(topicCar); + if (StringUtils.isEmpty(topicCar.getTopic())){ + throw new ServiceException("车辆主题不存在"); + } + //通知中间分流层拉取对应mqtt主题 + rabbitTemplate.convertAndSend("TOPIC_INFORM",topicCar.getTopic(),message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + }); return topicCar.getTopic(); }catch (Exception e){ log.info(e.getMessage());