判断异常
parent
976f35f212
commit
371c80d110
|
@ -17,6 +17,6 @@ import org.springframework.web.bind.annotation.RequestBody;
|
|||
@FeignClient(contextId = "remoteCodeService",value = ServiceNameConstants.BUSINESS_SERVICE, fallbackFactory = RemoteCodeFallbackFactory.class)
|
||||
public interface RemoteCodeService {
|
||||
|
||||
@PostMapping("insertCode")
|
||||
@PostMapping("trouble/insertCode")
|
||||
public Result<Integer> insertCode(@RequestBody CoupletTroubleLog coupletTroubleLog);
|
||||
}
|
||||
|
|
|
@ -48,16 +48,16 @@ public class ModelMessage {
|
|||
}
|
||||
};
|
||||
|
||||
@Value("${mq.queueName}")
|
||||
public String queueName;
|
||||
|
||||
//交换机
|
||||
@Value("${mq.exchangeName}")
|
||||
public String exchangeName;
|
||||
|
||||
//路由键
|
||||
@Value("${mq.routingKey}")
|
||||
public String routingKey;
|
||||
// @Value("${mq.queueName}")
|
||||
// public String queueName;
|
||||
//
|
||||
// //交换机
|
||||
// @Value("${mq.exchangeName}")
|
||||
// public String exchangeName;
|
||||
//
|
||||
// //路由键
|
||||
// @Value("${mq.routingKey}")
|
||||
// public String routingKey;
|
||||
|
||||
@Scheduled(cron = "0/5 * * * * ?")
|
||||
public void startMsg() {
|
||||
|
@ -85,10 +85,10 @@ public class ModelMessage {
|
|||
for (CoupletMsgData msgData : coupletMsgDataList) {
|
||||
log.info("解析到车辆数据:{}", msgData);
|
||||
//发送消息到MQ
|
||||
rabbitTemplate.convertAndSend("send-couplet-code",msgData,message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
});
|
||||
// rabbitTemplate.convertAndSend("send-couplet-code",msgData,message -> {
|
||||
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
// return message;
|
||||
// });
|
||||
for (String string : strings) {
|
||||
IncidentService incidentService = SpringUtils.getBean(string);
|
||||
incidentService.incident(msgData);
|
||||
|
|
|
@ -15,6 +15,7 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
|
@ -23,6 +24,7 @@ spring:
|
|||
# 共享配置
|
||||
shared-configs:
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
rabbitmq:
|
||||
|
@ -49,8 +51,8 @@ mybatis-plus:
|
|||
configuration:
|
||||
map-underscore-to-camel-case: true
|
||||
|
||||
# RabbitMQ配置
|
||||
mq:
|
||||
queueName: queue
|
||||
exchangeName: exchange
|
||||
routingKey: routingKey
|
||||
## RabbitMQ配置
|
||||
#mq:
|
||||
# queueName: queueName
|
||||
# exchangeName: exchangeName
|
||||
# routingKey: routingKey
|
||||
|
|
|
@ -1,97 +1,97 @@
|
|||
package com.couplet.business.server.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||
import com.couplet.common.domain.CoupletTroubleLog;
|
||||
import com.couplet.common.system.remote.RemoteCodeService;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* @author DongXiaoDong
|
||||
* @version 1.0
|
||||
* @date 2024/3/14 22:09
|
||||
* @description
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class SendCodeQueueConsumer {
|
||||
@Autowired
|
||||
private RedisTemplate<String, String> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private RemoteCodeService remoteCodeService;
|
||||
|
||||
@RabbitListener(queuesToDeclare = {@Queue("send-couplet-code")})
|
||||
public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) {
|
||||
log.info("日志队列:{},接收到的消息:{},开始消费...","send-couplet-code", JSONObject.toJSONString(msgData));
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
|
||||
try {
|
||||
Long aLong = redisTemplate.opsForSet().add("send-log-queue", messageId);
|
||||
if (aLong==1) {
|
||||
//异步保存日志
|
||||
CompletableFuture.runAsync(() -> {
|
||||
CoupletTroubleLog coupletTroubleLog = new CoupletTroubleLog();
|
||||
//判断状态是否为异常
|
||||
if (msgData.getVehicleStatus() !=1){
|
||||
String code = generateGTA();
|
||||
coupletTroubleLog.setTroubleLogCode(code);
|
||||
coupletTroubleLog.setTroubleLogStart(new Date());
|
||||
String vin = msgData.getVin();
|
||||
coupletTroubleLog.setTroubleLogVin(vin);
|
||||
// 如果状态为正常1时添加结束时间
|
||||
if (msgData.getVehicleStatus() == 1){
|
||||
coupletTroubleLog.setTroubleLogEnd(new Date());
|
||||
}
|
||||
}
|
||||
remoteCodeService.insertCode(coupletTroubleLog);
|
||||
});
|
||||
log.info("");
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","send-log-queue", JSONObject.toJSONString(msgData), (end-start));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
/**
|
||||
*
|
||||
* 拼接GTA字符串
|
||||
* @return
|
||||
*/
|
||||
public static String generateGTA() {
|
||||
// 生成以GTA开头的字符串
|
||||
String codefix = "GTA";
|
||||
// 删除4位数随机数字
|
||||
String s = generateRandomNumber(4);
|
||||
//拼接
|
||||
return codefix + s;
|
||||
}
|
||||
|
||||
/**
|
||||
* 随机生成1到10位的数字
|
||||
* @param length
|
||||
* @return
|
||||
*/
|
||||
public static String generateRandomNumber(int length) {
|
||||
Random random = new Random();
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < length; i++) {
|
||||
builder.append(random.nextInt(10));
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
//package com.couplet.business.server.consumer;
|
||||
//
|
||||
//import com.alibaba.fastjson.JSONObject;
|
||||
//import com.couplet.analyze.msg.domain.CoupletMsgData;
|
||||
//import com.couplet.common.domain.CoupletTroubleLog;
|
||||
//import com.couplet.common.system.remote.RemoteCodeService;
|
||||
//import com.rabbitmq.client.Channel;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//import org.springframework.amqp.core.Message;
|
||||
//import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.data.redis.core.RedisTemplate;
|
||||
//import org.springframework.stereotype.Component;
|
||||
//
|
||||
//import java.util.Date;
|
||||
//import java.util.Random;
|
||||
//import java.util.concurrent.CompletableFuture;
|
||||
//
|
||||
///**
|
||||
// * @author DongXiaoDong
|
||||
// * @version 1.0
|
||||
// * @date 2024/3/14 22:09
|
||||
// * @description
|
||||
// */
|
||||
//@Component
|
||||
//@Slf4j
|
||||
//public class SendCodeQueueConsumer {
|
||||
// @Autowired
|
||||
// private RedisTemplate<String, String> redisTemplate;
|
||||
//
|
||||
// @Autowired
|
||||
// private RemoteCodeService remoteCodeService;
|
||||
//
|
||||
// @RabbitListener(queuesToDeclare = {@Queue("send-couplet-code")})
|
||||
// public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) {
|
||||
// log.info("日志队列:{},接收到的消息:{},开始消费...","send-couplet-code", JSONObject.toJSONString(msgData));
|
||||
// long start = System.currentTimeMillis();
|
||||
//
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
//
|
||||
// try {
|
||||
// Long aLong = redisTemplate.opsForSet().add("send-log-queue", messageId);
|
||||
// if (aLong==1) {
|
||||
// //异步保存日志
|
||||
// CompletableFuture.runAsync(() -> {
|
||||
// CoupletTroubleLog coupletTroubleLog = new CoupletTroubleLog();
|
||||
// //判断状态是否为异常
|
||||
// if (msgData.getVehicleStatus() !=1){
|
||||
// String code = generateGTA();
|
||||
// coupletTroubleLog.setTroubleLogCode(code);
|
||||
// coupletTroubleLog.setTroubleLogStart(new Date());
|
||||
// String vin = msgData.getVin();
|
||||
// coupletTroubleLog.setTroubleLogVin(vin);
|
||||
// // 如果状态为正常1时添加结束时间
|
||||
// if (msgData.getVehicleStatus() == 1){
|
||||
// coupletTroubleLog.setTroubleLogEnd(new Date());
|
||||
// }
|
||||
// }
|
||||
// remoteCodeService.insertCode(coupletTroubleLog);
|
||||
// });
|
||||
// log.info("记录异常成功");
|
||||
// }
|
||||
// long end = System.currentTimeMillis();
|
||||
// log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","send-log-queue", JSONObject.toJSONString(msgData), (end-start));
|
||||
// } catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
// }
|
||||
// /**
|
||||
// *
|
||||
// * 拼接GTA字符串
|
||||
// * @return
|
||||
// */
|
||||
// public static String generateGTA() {
|
||||
// // 生成以GTA开头的字符串
|
||||
// String codefix = "GTA";
|
||||
// // 删除4位数随机数字
|
||||
// String s = generateRandomNumber(4);
|
||||
// //拼接
|
||||
// return codefix + s;
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 随机生成1到10位的数字
|
||||
// * @param length
|
||||
// * @return
|
||||
// */
|
||||
// public static String generateRandomNumber(int length) {
|
||||
// Random random = new Random();
|
||||
// StringBuilder builder = new StringBuilder();
|
||||
// for (int i = 0; i < length; i++) {
|
||||
// builder.append(random.nextInt(10));
|
||||
// }
|
||||
// return builder.toString();
|
||||
// }
|
||||
//}
|
||||
|
|
|
@ -7,7 +7,6 @@ spring:
|
|||
application:
|
||||
# 应用名称
|
||||
name: couplet-business
|
||||
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: dev
|
||||
|
@ -16,6 +15,7 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 121.89.211.230:8848
|
||||
|
@ -24,6 +24,7 @@ spring:
|
|||
# 共享配置
|
||||
shared-configs:
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
logging:
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
LEFT JOIN couplet_trouble_type y on t.type_id= y.type_id
|
||||
</sql>
|
||||
<insert id="insertMsgResq">
|
||||
insert into couplet_trouble_log(trouble_log_code,toruble_log_vin,trouble_log_start,trouble_log_end)
|
||||
insert into couplet_trouble_log(trouble_log_code,trouble_log_vin,trouble_log_start,trouble_log_end)
|
||||
values(#{troubleLogCode},#{troubleLogVin},#{troubleLogStart},#{troubleLogEnd})
|
||||
</insert>
|
||||
|
||||
|
|
|
@ -25,16 +25,16 @@ import org.springframework.context.annotation.Primary;
|
|||
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
|
||||
// 通过注入的方式获取队列名、交换机名和路由键
|
||||
//队列名
|
||||
@Value("${mq.queueName}")
|
||||
public String queueName;
|
||||
// @Value("${mq.queueName}")
|
||||
public static final String queueName = "queueName";
|
||||
|
||||
//交换机
|
||||
@Value("${mq.exchangeName}")
|
||||
public String exchangeName;
|
||||
// @Value("${mq.exchangeName}")
|
||||
public static final String exchangeName = "exchangeName";
|
||||
|
||||
//路由键
|
||||
@Value("${mq.routingKey}")
|
||||
public String routingKey;
|
||||
// @Value("${mq.routingKey}")
|
||||
public static final String routingKey = "routingKey";
|
||||
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
|
|
@ -1,55 +1,55 @@
|
|||
package com.couplet.mq.controller;
|
||||
|
||||
import com.couplet.common.core.utils.uuid.IdUtils;
|
||||
import com.couplet.mq.domain.User;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/3/29
|
||||
* @Description: MQController类
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/mq")
|
||||
@Slf4j
|
||||
public class MqController {
|
||||
// 通过注入的方式获取队列名、交换机名和路由键
|
||||
//队列名
|
||||
@Value("${mq.queueName}")
|
||||
public String queueName;
|
||||
|
||||
//交换机
|
||||
@Value("${mq.exchangeName}")
|
||||
public String exchangeName;
|
||||
|
||||
//路由键
|
||||
@Value("${mq.routingKey}")
|
||||
public String routingKey;
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
/*
|
||||
* @Author: LiuYunHu
|
||||
* @Date: 2024/4/1 19:58
|
||||
* @Description: 消息发送者
|
||||
* @Param: [data]
|
||||
* @Return: void
|
||||
**/
|
||||
@PostMapping("/sout")
|
||||
//接收json字符串
|
||||
public void sout(@RequestBody User param) {
|
||||
rabbitTemplate.convertAndSend(exchangeName, routingKey, param, message -> {
|
||||
message.getMessageProperties().setMessageId(IdUtils.randomUUID());
|
||||
return message;
|
||||
}, new CorrelationData(IdUtils.randomUUID())
|
||||
);
|
||||
|
||||
}
|
||||
}
|
||||
//package com.couplet.mq.controller;
|
||||
//
|
||||
//import com.couplet.common.core.utils.uuid.IdUtils;
|
||||
//import com.couplet.mq.domain.User;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
//import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.beans.factory.annotation.Value;
|
||||
//import org.springframework.web.bind.annotation.*;
|
||||
//
|
||||
///**
|
||||
// * @ProjectName: five-groups-couplet
|
||||
// * @Author: LiuYunHu
|
||||
// * @CreateTime: 2024/3/29
|
||||
// * @Description: MQController类
|
||||
// */
|
||||
//@RestController
|
||||
//@RequestMapping("/mq")
|
||||
//@Slf4j
|
||||
//public class MqController {
|
||||
// // 通过注入的方式获取队列名、交换机名和路由键
|
||||
// //队列名
|
||||
//// @Value("${mq.queueName}")
|
||||
// public String queueName;
|
||||
//
|
||||
// //交换机
|
||||
//// @Value("${mq.exchangeName}")
|
||||
// public String exchangeName;
|
||||
//
|
||||
// //路由键
|
||||
//// @Value("${mq.routingKey}")
|
||||
// public String routingKey;
|
||||
//
|
||||
// @Autowired
|
||||
// private RabbitTemplate rabbitTemplate;
|
||||
//
|
||||
// /*
|
||||
// * @Author: LiuYunHu
|
||||
// * @Date: 2024/4/1 19:58
|
||||
// * @Description: 消息发送者
|
||||
// * @Param: [data]
|
||||
// * @Return: void
|
||||
// **/
|
||||
// @PostMapping("/sout")
|
||||
// //接收json字符串
|
||||
// public void sout(@RequestBody User param) {
|
||||
// rabbitTemplate.convertAndSend(exchangeName, routingKey, param, message -> {
|
||||
// message.getMessageProperties().setMessageId(IdUtils.randomUUID());
|
||||
// return message;
|
||||
// }, new CorrelationData(IdUtils.randomUUID())
|
||||
// );
|
||||
//
|
||||
// }
|
||||
//}
|
||||
|
|
|
@ -1,164 +1,164 @@
|
|||
package com.couplet.mq.service;
|
||||
|
||||
import com.couplet.mq.domain.User;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @ProjectName: five-groups-couplet
|
||||
* @Author: LiuYunHu
|
||||
* @CreateTime: 2024/3/28
|
||||
* @Description: MQ消费者类
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@SuppressWarnings("all")
|
||||
@RabbitListener(queues = "${mq.queueName}")
|
||||
public class Consumer {
|
||||
@Autowired
|
||||
private StringRedisTemplate redis;
|
||||
|
||||
/* 线程池执行
|
||||
|
||||
//创建一个定长线程池
|
||||
private final Executor executor = Executors.newFixedThreadPool(5);
|
||||
|
||||
@Async
|
||||
@RabbitHandler
|
||||
public void process(User param, Channel channel, Message message) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
handleMessage(param, channel, message);
|
||||
} catch (IOException e) {
|
||||
log.error("处理消息失败:{}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
//处理信息的方法
|
||||
private void handleMessage(User param, Channel channel, Message message) throws IOException {
|
||||
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
|
||||
if (!redis.hasKey("value:" + messageId)) {
|
||||
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
// 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
//过期时间
|
||||
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
|
||||
|
||||
try {
|
||||
if (add == 1) {
|
||||
//第一次 消费
|
||||
System.out.println("*****************************");
|
||||
System.out.println("消费者收到消息:" + param);
|
||||
System.out.println("*****************************");
|
||||
log.info("消费结束");
|
||||
|
||||
channel.basicAck(deliveryTag, false);
|
||||
|
||||
} else {
|
||||
//重复消费
|
||||
log.error("重复消费");
|
||||
channel.basicReject(deliveryTag, false);
|
||||
|
||||
//删除缓存
|
||||
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消息没有成功消费!");
|
||||
|
||||
String s = redis.opsForValue().get("value:" + messageId);
|
||||
|
||||
long oldTag = Long.parseLong(s);
|
||||
|
||||
if (deliveryTag == (oldTag + 2)) {
|
||||
log.error("确实消费不了,不入队了!");
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.info("消息消费失败,重新入队");
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
**/
|
||||
|
||||
@RabbitHandler
|
||||
public void process(User param, Channel channel, Message message) throws IOException {
|
||||
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
|
||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
|
||||
if (!redis.hasKey("value:" + messageId)) {
|
||||
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
// 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
//过期时间
|
||||
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
|
||||
|
||||
try {
|
||||
if (add == 1) {
|
||||
//第一次 消费
|
||||
System.out.println("*****************************");
|
||||
System.out.println("消费者收到消息:" + param);
|
||||
System.out.println("*****************************");
|
||||
log.info("消费结束");
|
||||
|
||||
//确认消费
|
||||
channel.basicAck(deliveryTag, false);
|
||||
|
||||
} else {
|
||||
//重复消费
|
||||
log.error("重复消费");
|
||||
//拒绝消费
|
||||
channel.basicReject(deliveryTag, false);
|
||||
|
||||
//删除缓存
|
||||
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消息没有成功消费!");
|
||||
|
||||
String s = redis.opsForValue().get("value:" + messageId);
|
||||
|
||||
long oldTag = Long.parseLong(s);
|
||||
|
||||
if (deliveryTag == (oldTag + 2)) {
|
||||
log.error("确实消费不了,不入队了!");
|
||||
|
||||
|
||||
//拒绝消费
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
} else {
|
||||
log.info("消息消费失败,重新入队");
|
||||
//重新入队
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//package com.couplet.mq.service;
|
||||
//
|
||||
//import com.couplet.mq.domain.User;
|
||||
//import com.rabbitmq.client.Channel;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//import org.springframework.amqp.core.Message;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
//import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
//import org.springframework.stereotype.Component;
|
||||
//
|
||||
//import java.io.IOException;
|
||||
//import java.util.concurrent.TimeUnit;
|
||||
//
|
||||
///**
|
||||
// * @ProjectName: five-groups-couplet
|
||||
// * @Author: LiuYunHu
|
||||
// * @CreateTime: 2024/3/28
|
||||
// * @Description: MQ消费者类
|
||||
// */
|
||||
//
|
||||
//@Component
|
||||
//@Slf4j
|
||||
//@SuppressWarnings("all")
|
||||
//@RabbitListener(queues = "${mq.queueName}")
|
||||
//public class Consumer {
|
||||
// @Autowired
|
||||
// private StringRedisTemplate redis;
|
||||
//
|
||||
// /* 线程池执行
|
||||
//
|
||||
// //创建一个定长线程池
|
||||
// private final Executor executor = Executors.newFixedThreadPool(5);
|
||||
//
|
||||
// @Async
|
||||
// @RabbitHandler
|
||||
// public void process(User param, Channel channel, Message message) {
|
||||
// executor.execute(() -> {
|
||||
// try {
|
||||
// handleMessage(param, channel, message);
|
||||
// } catch (IOException e) {
|
||||
// log.error("处理消息失败:{}", e);
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// //处理信息的方法
|
||||
// private void handleMessage(User param, Channel channel, Message message) throws IOException {
|
||||
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
//
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
//
|
||||
// if (!redis.hasKey("value:" + messageId)) {
|
||||
// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
// }
|
||||
//
|
||||
// // 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
// //过期时间
|
||||
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
//
|
||||
//
|
||||
// try {
|
||||
// if (add == 1) {
|
||||
// //第一次 消费
|
||||
// System.out.println("*****************************");
|
||||
// System.out.println("消费者收到消息:" + param);
|
||||
// System.out.println("*****************************");
|
||||
// log.info("消费结束");
|
||||
//
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
//
|
||||
// } else {
|
||||
// //重复消费
|
||||
// log.error("重复消费");
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
//
|
||||
// //删除缓存
|
||||
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// log.error("消息没有成功消费!");
|
||||
//
|
||||
// String s = redis.opsForValue().get("value:" + messageId);
|
||||
//
|
||||
// long oldTag = Long.parseLong(s);
|
||||
//
|
||||
// if (deliveryTag == (oldTag + 2)) {
|
||||
// log.error("确实消费不了,不入队了!");
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.info("消息消费失败,重新入队");
|
||||
// channel.basicNack(deliveryTag, false, true);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
//**/
|
||||
//
|
||||
// @RabbitHandler
|
||||
// public void process(User param, Channel channel, Message message) throws IOException {
|
||||
// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
|
||||
//
|
||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||
// String messageId = message.getMessageProperties().getMessageId();
|
||||
//
|
||||
// if (!redis.hasKey("value:" + messageId)) {
|
||||
// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
|
||||
// }
|
||||
//
|
||||
// // 1 添加成功新数据 0已有重复值,不允许再添加
|
||||
// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
|
||||
// //过期时间
|
||||
// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
|
||||
//
|
||||
//
|
||||
// try {
|
||||
// if (add == 1) {
|
||||
// //第一次 消费
|
||||
// System.out.println("*****************************");
|
||||
// System.out.println("消费者收到消息:" + param);
|
||||
// System.out.println("*****************************");
|
||||
// log.info("消费结束");
|
||||
//
|
||||
// //确认消费
|
||||
// channel.basicAck(deliveryTag, false);
|
||||
//
|
||||
// } else {
|
||||
// //重复消费
|
||||
// log.error("重复消费");
|
||||
// //拒绝消费
|
||||
// channel.basicReject(deliveryTag, false);
|
||||
//
|
||||
// //删除缓存
|
||||
// redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// log.error("消息没有成功消费!");
|
||||
//
|
||||
// String s = redis.opsForValue().get("value:" + messageId);
|
||||
//
|
||||
// long oldTag = Long.parseLong(s);
|
||||
//
|
||||
// if (deliveryTag == (oldTag + 2)) {
|
||||
// log.error("确实消费不了,不入队了!");
|
||||
//
|
||||
//
|
||||
// //拒绝消费
|
||||
// channel.basicNack(deliveryTag, false, false);
|
||||
// } else {
|
||||
// log.info("消息消费失败,重新入队");
|
||||
// //重新入队
|
||||
// channel.basicNack(deliveryTag, false, true);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
|
Loading…
Reference in New Issue