rabbitmq解析

vehicle-liyongjie
李永杰 2024-04-10 20:15:25 +08:00
parent 9c86bbf900
commit 59bf3c8abc
3 changed files with 119 additions and 3 deletions

View File

@ -14,11 +14,14 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 10.10.26.1:8848
# 命名空间
namespace: lyj
config:
# 服务注册地址
server-addr: 10.10.26.1:8848
config:
# 配置中心地址
server-addr: 10.10.26.1:8848
# 命名空间
namespace: lyj
# 配置文件格式
file-extension: yml
# 共享配置

View File

@ -0,0 +1,23 @@
package com.muyu.common.core.constant;
import lombok.extern.slf4j.*;
import org.springframework.stereotype.*;
/**
* MQ
* @author LiYongJie
* @date 2024/4/9
*/
@Slf4j
@Component
public class MQQueueConstants {
/**
*
*/
public static final String ALARM_QUEUE = "alarm";
/**
*
*/
public static final String FAULT_QUEUE = "fault";
}

View File

@ -0,0 +1,90 @@
package com.muyu.business.sms;
import com.muyu.business.domain.*;
import com.muyu.business.service.*;
import com.muyu.common.core.constant.*;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.*;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.beans.factory.annotation.*;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.*;
import java.io.*;
@Slf4j
@Component
public class SendSmsConfig {
/**
* redis
*/
@Autowired
private StringRedisTemplate redisTemplate;
/**
*
*/
@Autowired
private AlarmLogsService alarmLogsService;
/**
*
*/
@Autowired
private FaultLogsService faultLogsService;
@RabbitListener(queuesToDeclare = {@Queue(value = MQQueueConstants.ALARM_QUEUE, declare = "true")})
public void sendSms(AlarmLogs alarmLogsAddReq, Message message, Channel channel) {
log.info("消息队列:【{}】,收到报警日志:【{}】",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq);
try {
String messageId = message.getMessageProperties().getMessageId();
Long count = redisTemplate.opsForSet().add(MQQueueConstants.ALARM_QUEUE, messageId);
if (count.intValue() > 0){
alarmLogsService.insertAlarmLogs(alarmLogsAddReq);
// 消息确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("消息队列:【{}】,收到报警日志:【{}】,消费成功...",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq);
}else {
log.error("消息队列:【{}】,收到报警日志:【{}】,消费重复...",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq);
}
} catch (IOException e) {
e.printStackTrace();
log.error("消息队列:【{}】,收到报警日志:【{}】,消费异常:【{}】",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq,e.getMessage());
// 消息回退
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ex) {
log.error("消息队列:【{}】,收到报警日志:【{}】,消息退回异常:【{}】",MQQueueConstants.ALARM_QUEUE,alarmLogsAddReq,e.getMessage());
}
}
}
@RabbitListener(queuesToDeclare = {@Queue(value = MQQueueConstants.FAULT_QUEUE, declare = "true")})
public void sendSms(FaultLogs faultLogs, Message message, Channel channel) {
log.info("消息队列:【{}】,收到故障日志:【{}】",MQQueueConstants.FAULT_QUEUE,faultLogs);
try {
String messageId = message.getMessageProperties().getMessageId();
Long count = redisTemplate.opsForSet().add(MQQueueConstants.FAULT_QUEUE, messageId);
if (count.intValue() > 0){
faultLogsService.insertFaultLogs(faultLogs);
// 消息确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("消息队列:【{}】,收到故障日志:【{}】,消费成功...",MQQueueConstants.FAULT_QUEUE,faultLogs);
}else {
log.error("消息队列:【{}】,收到故障日志:【{}】,消费重复...",MQQueueConstants.FAULT_QUEUE,faultLogs);
}
} catch (IOException e) {
e.printStackTrace();
log.error("消息队列:【{}】,收到故障日志:【{}】,消费异常:【{}】",MQQueueConstants.FAULT_QUEUE,faultLogs,e.getMessage());
// 消息回退
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ex) {
log.error("消息队列:【{}】,收到故障日志:【{}】,消息退回异常:【{}】",MQQueueConstants.FAULT_QUEUE,faultLogs,e.getMessage());
}
}
}
}