From 59bf3c8abccda462f5de4cdb8efd2c7a9dd49a76 Mon Sep 17 00:00:00 2001 From: liyongjie <1318551549@qq.com> Date: Wed, 10 Apr 2024 20:15:25 +0800 Subject: [PATCH] =?UTF-8?q?rabbitmq=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- muyu-auth/src/main/resources/bootstrap.yml | 9 +- .../core/constant/MQQueueConstants.java | 23 +++++ .../com/muyu/business/sms/SendSmsConfig.java | 90 +++++++++++++++++++ 3 files changed, 119 insertions(+), 3 deletions(-) create mode 100644 muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/constant/MQQueueConstants.java create mode 100644 muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/business/sms/SendSmsConfig.java diff --git a/muyu-auth/src/main/resources/bootstrap.yml b/muyu-auth/src/main/resources/bootstrap.yml index 293a999..997276b 100644 --- a/muyu-auth/src/main/resources/bootstrap.yml +++ b/muyu-auth/src/main/resources/bootstrap.yml @@ -14,11 +14,14 @@ spring: nacos: discovery: # 服务注册地址 + server-addr: 10.10.26.1:8848 + # 命名空间 + namespace: lyj + config: # 服务注册地址 server-addr: 10.10.26.1:8848 - config: - # 配置中心地址 - server-addr: 10.10.26.1:8848 + # 命名空间 + namespace: lyj # 配置文件格式 file-extension: yml # 共享配置 diff --git a/muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/constant/MQQueueConstants.java b/muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/constant/MQQueueConstants.java new file mode 100644 index 0000000..873bc79 --- /dev/null +++ b/muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/constant/MQQueueConstants.java @@ -0,0 +1,23 @@ +package com.muyu.common.core.constant; + +import lombok.extern.slf4j.*; +import org.springframework.stereotype.*; + +/** + * MQ常量 + * @author LiYongJie + * @date 2024/4/9 + */ +@Slf4j +@Component +public class MQQueueConstants { + /** + * 报警队列 + */ + public static final String ALARM_QUEUE = "alarm"; + + /** + * 故障队列 + */ + public static final String FAULT_QUEUE = "fault"; +} diff --git a/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/business/sms/SendSmsConfig.java b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/business/sms/SendSmsConfig.java new file mode 100644 index 0000000..f9a89b4 --- /dev/null +++ b/muyu-modules/muyu-business/muyu-business-server/src/main/java/com/muyu/business/sms/SendSmsConfig.java @@ -0,0 +1,90 @@ +package com.muyu.business.sms; + +import com.muyu.business.domain.*; +import com.muyu.business.service.*; +import com.muyu.common.core.constant.*; +import com.rabbitmq.client.*; +import lombok.extern.slf4j.*; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.annotation.*; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.beans.factory.annotation.*; +import org.springframework.data.redis.core.*; +import org.springframework.stereotype.*; + +import java.io.*; + +@Slf4j +@Component +public class SendSmsConfig { + + /** + * 注入redis工具类 + */ + @Autowired + private StringRedisTemplate redisTemplate; + + /** + * 注入报警日志服务 + */ + @Autowired + private AlarmLogsService alarmLogsService; + + /** + * 注入故障日志服务 + */ + @Autowired + private FaultLogsService faultLogsService; + + @RabbitListener(queuesToDeclare = {@Queue(value = MQQueueConstants.ALARM_QUEUE, declare = "true")}) + public void sendSms(AlarmLogs alarmLogsAddReq, Message message, Channel channel) { + log.info("消息队列:【{}】,收到报警日志:【{}】",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq); + try { + String messageId = message.getMessageProperties().getMessageId(); + Long count = redisTemplate.opsForSet().add(MQQueueConstants.ALARM_QUEUE, messageId); + if (count.intValue() > 0){ + alarmLogsService.insertAlarmLogs(alarmLogsAddReq); + // 消息确认 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("消息队列:【{}】,收到报警日志:【{}】,消费成功...",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq); + }else { + log.error("消息队列:【{}】,收到报警日志:【{}】,消费重复...",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq); + } + } catch (IOException e) { + e.printStackTrace(); + log.error("消息队列:【{}】,收到报警日志:【{}】,消费异常:【{}】",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq,e.getMessage()); + // 消息回退 + try { + channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); + } catch (IOException ex) { + log.error("消息队列:【{}】,收到报警日志:【{}】,消息退回异常:【{}】",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq,e.getMessage()); + } + } + } + + @RabbitListener(queuesToDeclare = {@Queue(value = MQQueueConstants.FAULT_QUEUE, declare = "true")}) + public void sendSms(FaultLogs faultLogs, Message message, Channel channel) { + log.info("消息队列:【{}】,收到故障日志:【{}】",MQQueueConstants.FAULT_QUEUE,faultLogs); + try { + String messageId = message.getMessageProperties().getMessageId(); + Long count = redisTemplate.opsForSet().add(MQQueueConstants.FAULT_QUEUE, messageId); + if (count.intValue() > 0){ + faultLogsService.insertFaultLogs(faultLogs); + // 消息确认 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("消息队列:【{}】,收到故障日志:【{}】,消费成功...",MQQueueConstants.FAULT_QUEUE,faultLogs); + }else { + log.error("消息队列:【{}】,收到故障日志:【{}】,消费重复...",MQQueueConstants.FAULT_QUEUE,faultLogs); + } + } catch (IOException e) { + e.printStackTrace(); + log.error("消息队列:【{}】,收到故障日志:【{}】,消费异常:【{}】",MQQueueConstants.FAULT_QUEUE,faultLogs,e.getMessage()); + // 消息回退 + try { + channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); + } catch (IOException ex) { + log.error("消息队列:【{}】,收到故障日志:【{}】,消息退回异常:【{}】",MQQueueConstants.FAULT_QUEUE,faultLogs,e.getMessage()); + } + } + } +}