load_center/src/main/java/com/muyu/loadCenter/mq/Consumer.java

100 lines
3.4 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.muyu.loadCenter.mq;
import com.muyu.loadCenter.aliyun.service.AliYunEcsService;
import com.muyu.loadCenter.gateway.model.GatewayNodeInfo;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消费者类用于处理来自RabbitMQ的消息。
* 使用Spring AMQP框架监听指定队列中的消息。
*/
@RabbitListener(queues = "queue")
@Component
@Log4j2
public class Consumer {
// 注入Spring的StringRedisTemplate以操作Redis
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private AliYunEcsService aliYunEcsService;
/**
* 处理接收到的消息,具体逻辑包括:
* 1. 判断消息是否已被处理过,未处理则进行处理并标记为已处理;
* 2. 若消息已处理过,则认为是重复消费,拒绝处理。
* 3. 对于异常情况,根据重试次数决定是重试、拒绝还是退回消息。
*
* @param gatewayNodeInfo 消息中包含的网关节点信息。
* @param channel RabbitMQ的通道用于确认消息处理情况。
* @param message 接收到的消息对象。
* @throws IOException 与RabbitMQ通信时可能抛出的异常。
*/
@RabbitHandler
public void Handler(GatewayNodeInfo gatewayNodeInfo, Channel channel, Message message) throws IOException {
// 消息ID和传递标签
String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 检查消息是否首次处理
if (!redisTemplate.hasKey("mi"+messageId)){
// 首次处理,存储传递标签
redisTemplate.opsForValue().set("mi"+messageId,deliveryTag+"");
}
try {
// 检查是否重复消费
if (!redisTemplate.hasKey("cf"+messageId)){
// 确认消费并在Redis中做标记
log.info("确认消费");
log.info(gatewayNodeInfo);
// aliYunEcsService.releaseECS(gatewayNodeInfo.getNodeId()); // 释放ECS实例
redisTemplate.opsForValue().set("cf"+messageId,messageId);
channel.basicAck(deliveryTag,false);
}else {
// 重复消费,拒绝处理
log.info("重复消费");
channel.basicReject(deliveryTag,false);
}
} catch (Exception exception) {
// 异常处理逻辑,根据重试次数决定消息处理方式
String s = redisTemplate.opsForValue().get("mi" + messageId);
long old = Long.parseLong(s);
if (deliveryTag==(old+2)){
// 重试三次后,仍然失败,退回消息
log.info("已重试三次,实在消费不了");
channel.basicNack(deliveryTag,false,false);
}else {
// 尝试再次重试
log.info("继续重试");
channel.basicNack(deliveryTag,false,true);
}
}
}
}