diff --git a/couplet-common/couplet-common-system/src/main/java/com/couplet/common/system/remote/RemoteCodeService.java b/couplet-common/couplet-common-system/src/main/java/com/couplet/common/system/remote/RemoteCodeService.java index 4307c75..1475af8 100644 --- a/couplet-common/couplet-common-system/src/main/java/com/couplet/common/system/remote/RemoteCodeService.java +++ b/couplet-common/couplet-common-system/src/main/java/com/couplet/common/system/remote/RemoteCodeService.java @@ -17,6 +17,6 @@ import org.springframework.web.bind.annotation.RequestBody; @FeignClient(contextId = "remoteCodeService",value = ServiceNameConstants.BUSINESS_SERVICE, fallbackFactory = RemoteCodeFallbackFactory.class) public interface RemoteCodeService { - @PostMapping("insertCode") + @PostMapping("trouble/insertCode") public Result insertCode(@RequestBody CoupletTroubleLog coupletTroubleLog); } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelMessage.java index 4a36271..50ed034 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelMessage.java @@ -48,16 +48,16 @@ public class ModelMessage { } }; - @Value("${mq.queueName}") - public String queueName; - - //交换机 - @Value("${mq.exchangeName}") - public String exchangeName; - - //路由键 - @Value("${mq.routingKey}") - public String routingKey; +// @Value("${mq.queueName}") +// public String queueName; +// +// //交换机 +// @Value("${mq.exchangeName}") +// public String exchangeName; +// +// //路由键 +// @Value("${mq.routingKey}") +// public String routingKey; @Scheduled(cron = "0/5 * * * * ?") public void startMsg() { @@ -85,10 +85,10 @@ public class ModelMessage { for (CoupletMsgData msgData : coupletMsgDataList) { log.info("解析到车辆数据:{}", msgData); //发送消息到MQ - rabbitTemplate.convertAndSend("send-couplet-code",msgData,message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - }); +// rabbitTemplate.convertAndSend("send-couplet-code",msgData,message -> { +// message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); +// return message; +// }); for (String string : strings) { IncidentService incidentService = SpringUtils.getBean(string); incidentService.incident(msgData); diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index bed4d26..1476cf8 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -15,6 +15,7 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 config: # 配置中心地址 server-addr: 121.89.211.230:8848 @@ -23,6 +24,7 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 main: allow-bean-definition-overriding: true rabbitmq: @@ -49,8 +51,8 @@ mybatis-plus: configuration: map-underscore-to-camel-case: true -# RabbitMQ配置 -mq: - queueName: queue - exchangeName: exchange - routingKey: routingKey +## RabbitMQ配置 +#mq: +# queueName: queueName +# exchangeName: exchangeName +# routingKey: routingKey diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/consumer/SendCodeQueueConsumer.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/consumer/SendCodeQueueConsumer.java index 9603263..cdebce4 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/consumer/SendCodeQueueConsumer.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/consumer/SendCodeQueueConsumer.java @@ -1,97 +1,97 @@ -package com.couplet.business.server.consumer; - -import com.alibaba.fastjson.JSONObject; -import com.couplet.analyze.msg.domain.CoupletMsgData; -import com.couplet.common.domain.CoupletTroubleLog; -import com.couplet.common.system.remote.RemoteCodeService; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; - -import java.util.Date; -import java.util.Random; -import java.util.concurrent.CompletableFuture; - -/** - * @author DongXiaoDong - * @version 1.0 - * @date 2024/3/14 22:09 - * @description - */ -@Component -@Slf4j -public class SendCodeQueueConsumer { - @Autowired - private RedisTemplate redisTemplate; - - @Autowired - private RemoteCodeService remoteCodeService; - - @RabbitListener(queuesToDeclare = {@Queue("send-couplet-code")}) - public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) { - log.info("日志队列:{},接收到的消息:{},开始消费...","send-couplet-code", JSONObject.toJSONString(msgData)); - long start = System.currentTimeMillis(); - - String messageId = message.getMessageProperties().getMessageId(); - - try { - Long aLong = redisTemplate.opsForSet().add("send-log-queue", messageId); - if (aLong==1) { - //异步保存日志 - CompletableFuture.runAsync(() -> { - CoupletTroubleLog coupletTroubleLog = new CoupletTroubleLog(); - //判断状态是否为异常 - if (msgData.getVehicleStatus() !=1){ - String code = generateGTA(); - coupletTroubleLog.setTroubleLogCode(code); - coupletTroubleLog.setTroubleLogStart(new Date()); - String vin = msgData.getVin(); - coupletTroubleLog.setTroubleLogVin(vin); - // 如果状态为正常1时添加结束时间 - if (msgData.getVehicleStatus() == 1){ - coupletTroubleLog.setTroubleLogEnd(new Date()); - } - } - remoteCodeService.insertCode(coupletTroubleLog); - }); - log.info(""); - } - long end = System.currentTimeMillis(); - log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","send-log-queue", JSONObject.toJSONString(msgData), (end-start)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - /** - * - * 拼接GTA字符串 - * @return - */ - public static String generateGTA() { - // 生成以GTA开头的字符串 - String codefix = "GTA"; - // 删除4位数随机数字 - String s = generateRandomNumber(4); - //拼接 - return codefix + s; - } - - /** - * 随机生成1到10位的数字 - * @param length - * @return - */ - public static String generateRandomNumber(int length) { - Random random = new Random(); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < length; i++) { - builder.append(random.nextInt(10)); - } - return builder.toString(); - } -} +//package com.couplet.business.server.consumer; +// +//import com.alibaba.fastjson.JSONObject; +//import com.couplet.analyze.msg.domain.CoupletMsgData; +//import com.couplet.common.domain.CoupletTroubleLog; +//import com.couplet.common.system.remote.RemoteCodeService; +//import com.rabbitmq.client.Channel; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.rabbit.annotation.Queue; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.stereotype.Component; +// +//import java.util.Date; +//import java.util.Random; +//import java.util.concurrent.CompletableFuture; +// +///** +// * @author DongXiaoDong +// * @version 1.0 +// * @date 2024/3/14 22:09 +// * @description +// */ +//@Component +//@Slf4j +//public class SendCodeQueueConsumer { +// @Autowired +// private RedisTemplate redisTemplate; +// +// @Autowired +// private RemoteCodeService remoteCodeService; +// +// @RabbitListener(queuesToDeclare = {@Queue("send-couplet-code")}) +// public void sendLogQueueConsumer(Message message, CoupletMsgData msgData, Channel channel) { +// log.info("日志队列:{},接收到的消息:{},开始消费...","send-couplet-code", JSONObject.toJSONString(msgData)); +// long start = System.currentTimeMillis(); +// +// String messageId = message.getMessageProperties().getMessageId(); +// +// try { +// Long aLong = redisTemplate.opsForSet().add("send-log-queue", messageId); +// if (aLong==1) { +// //异步保存日志 +// CompletableFuture.runAsync(() -> { +// CoupletTroubleLog coupletTroubleLog = new CoupletTroubleLog(); +// //判断状态是否为异常 +// if (msgData.getVehicleStatus() !=1){ +// String code = generateGTA(); +// coupletTroubleLog.setTroubleLogCode(code); +// coupletTroubleLog.setTroubleLogStart(new Date()); +// String vin = msgData.getVin(); +// coupletTroubleLog.setTroubleLogVin(vin); +// // 如果状态为正常1时添加结束时间 +// if (msgData.getVehicleStatus() == 1){ +// coupletTroubleLog.setTroubleLogEnd(new Date()); +// } +// } +// remoteCodeService.insertCode(coupletTroubleLog); +// }); +// log.info("记录异常成功"); +// } +// long end = System.currentTimeMillis(); +// log.info("日志队列:{},接收到的消息:{},消费完成,耗时:{}毫秒","send-log-queue", JSONObject.toJSONString(msgData), (end-start)); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// } +// /** +// * +// * 拼接GTA字符串 +// * @return +// */ +// public static String generateGTA() { +// // 生成以GTA开头的字符串 +// String codefix = "GTA"; +// // 删除4位数随机数字 +// String s = generateRandomNumber(4); +// //拼接 +// return codefix + s; +// } +// +// /** +// * 随机生成1到10位的数字 +// * @param length +// * @return +// */ +// public static String generateRandomNumber(int length) { +// Random random = new Random(); +// StringBuilder builder = new StringBuilder(); +// for (int i = 0; i < length; i++) { +// builder.append(random.nextInt(10)); +// } +// return builder.toString(); +// } +//} diff --git a/couplet-modules/couplet-business/src/main/resources/bootstrap.yml b/couplet-modules/couplet-business/src/main/resources/bootstrap.yml index cf4e9b6..2a9497d 100644 --- a/couplet-modules/couplet-business/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-business/src/main/resources/bootstrap.yml @@ -7,7 +7,6 @@ spring: application: # 应用名称 name: couplet-business - profiles: # 环境配置 active: dev @@ -16,6 +15,7 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 config: # 配置中心地址 server-addr: 121.89.211.230:8848 @@ -24,6 +24,7 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 main: allow-bean-definition-overriding: true logging: diff --git a/couplet-modules/couplet-business/src/main/resources/mapper/business/SysTroubleMapper.xml b/couplet-modules/couplet-business/src/main/resources/mapper/business/SysTroubleMapper.xml index 4701557..7048dcf 100644 --- a/couplet-modules/couplet-business/src/main/resources/mapper/business/SysTroubleMapper.xml +++ b/couplet-modules/couplet-business/src/main/resources/mapper/business/SysTroubleMapper.xml @@ -20,7 +20,7 @@ LEFT JOIN couplet_trouble_type y on t.type_id= y.type_id - insert into couplet_trouble_log(trouble_log_code,toruble_log_vin,trouble_log_start,trouble_log_end) + insert into couplet_trouble_log(trouble_log_code,trouble_log_vin,trouble_log_start,trouble_log_end) values(#{troubleLogCode},#{troubleLogVin},#{troubleLogStart},#{troubleLogEnd}) diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java index 341d052..853a559 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java @@ -25,16 +25,16 @@ import org.springframework.context.annotation.Primary; public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { // 通过注入的方式获取队列名、交换机名和路由键 //队列名 - @Value("${mq.queueName}") - public String queueName; +// @Value("${mq.queueName}") + public static final String queueName = "queueName"; //交换机 - @Value("${mq.exchangeName}") - public String exchangeName; +// @Value("${mq.exchangeName}") + public static final String exchangeName = "exchangeName"; //路由键 - @Value("${mq.routingKey}") - public String routingKey; +// @Value("${mq.routingKey}") + public static final String routingKey = "routingKey"; private RabbitTemplate rabbitTemplate; diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java index 057ac32..87b5529 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java @@ -1,55 +1,55 @@ -package com.couplet.mq.controller; - -import com.couplet.common.core.utils.uuid.IdUtils; -import com.couplet.mq.domain.User; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.connection.CorrelationData; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.web.bind.annotation.*; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/3/29 - * @Description: MQController类 - */ -@RestController -@RequestMapping("/mq") -@Slf4j -public class MqController { - // 通过注入的方式获取队列名、交换机名和路由键 - //队列名 - @Value("${mq.queueName}") - public String queueName; - - //交换机 - @Value("${mq.exchangeName}") - public String exchangeName; - - //路由键 - @Value("${mq.routingKey}") - public String routingKey; - - @Autowired - private RabbitTemplate rabbitTemplate; - - /* - * @Author: LiuYunHu - * @Date: 2024/4/1 19:58 - * @Description: 消息发送者 - * @Param: [data] - * @Return: void - **/ - @PostMapping("/sout") - //接收json字符串 - public void sout(@RequestBody User param) { - rabbitTemplate.convertAndSend(exchangeName, routingKey, param, message -> { - message.getMessageProperties().setMessageId(IdUtils.randomUUID()); - return message; - }, new CorrelationData(IdUtils.randomUUID()) - ); - - } -} +//package com.couplet.mq.controller; +// +//import com.couplet.common.core.utils.uuid.IdUtils; +//import com.couplet.mq.domain.User; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.amqp.rabbit.connection.CorrelationData; +//import org.springframework.amqp.rabbit.core.RabbitTemplate; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.web.bind.annotation.*; +// +///** +// * @ProjectName: five-groups-couplet +// * @Author: LiuYunHu +// * @CreateTime: 2024/3/29 +// * @Description: MQController类 +// */ +//@RestController +//@RequestMapping("/mq") +//@Slf4j +//public class MqController { +// // 通过注入的方式获取队列名、交换机名和路由键 +// //队列名 +//// @Value("${mq.queueName}") +// public String queueName; +// +// //交换机 +//// @Value("${mq.exchangeName}") +// public String exchangeName; +// +// //路由键 +//// @Value("${mq.routingKey}") +// public String routingKey; +// +// @Autowired +// private RabbitTemplate rabbitTemplate; +// +// /* +// * @Author: LiuYunHu +// * @Date: 2024/4/1 19:58 +// * @Description: 消息发送者 +// * @Param: [data] +// * @Return: void +// **/ +// @PostMapping("/sout") +// //接收json字符串 +// public void sout(@RequestBody User param) { +// rabbitTemplate.convertAndSend(exchangeName, routingKey, param, message -> { +// message.getMessageProperties().setMessageId(IdUtils.randomUUID()); +// return message; +// }, new CorrelationData(IdUtils.randomUUID()) +// ); +// +// } +//} diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/Consumer.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/Consumer.java index a01fcac..5c0def9 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/Consumer.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/Consumer.java @@ -1,164 +1,164 @@ -package com.couplet.mq.service; - -import com.couplet.mq.domain.User; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/3/28 - * @Description: MQ消费者类 - */ - -@Component -@Slf4j -@SuppressWarnings("all") -@RabbitListener(queues = "${mq.queueName}") -public class Consumer { - @Autowired - private StringRedisTemplate redis; - - /* 线程池执行 - - //创建一个定长线程池 - private final Executor executor = Executors.newFixedThreadPool(5); - - @Async - @RabbitHandler - public void process(User param, Channel channel, Message message) { - executor.execute(() -> { - try { - handleMessage(param, channel, message); - } catch (IOException e) { - log.error("处理消息失败:{}", e); - } - }); - } - - //处理信息的方法 - private void handleMessage(User param, Channel channel, Message message) throws IOException { - log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); - - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - String messageId = message.getMessageProperties().getMessageId(); - - if (!redis.hasKey("value:" + messageId)) { - redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); - } - - // 1 添加成功新数据 0已有重复值,不允许再添加 - Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); - //过期时间 - redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); - - - try { - if (add == 1) { - //第一次 消费 - System.out.println("*****************************"); - System.out.println("消费者收到消息:" + param); - System.out.println("*****************************"); - log.info("消费结束"); - - channel.basicAck(deliveryTag, false); - - } else { - //重复消费 - log.error("重复消费"); - channel.basicReject(deliveryTag, false); - - //删除缓存 - redis.opsForSet().remove("set:" + messageId, "set:" + messageId); - } - - - } catch (Exception e) { - log.error("消息没有成功消费!"); - - String s = redis.opsForValue().get("value:" + messageId); - - long oldTag = Long.parseLong(s); - - if (deliveryTag == (oldTag + 2)) { - log.error("确实消费不了,不入队了!"); - channel.basicNack(deliveryTag, false, false); - } else { - log.info("消息消费失败,重新入队"); - channel.basicNack(deliveryTag, false, true); - } - } - - } - -**/ - - @RabbitHandler - public void process(User param, Channel channel, Message message) throws IOException { - log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); - - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - String messageId = message.getMessageProperties().getMessageId(); - - if (!redis.hasKey("value:" + messageId)) { - redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); - } - - // 1 添加成功新数据 0已有重复值,不允许再添加 - Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); - //过期时间 - redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); - - - try { - if (add == 1) { - //第一次 消费 - System.out.println("*****************************"); - System.out.println("消费者收到消息:" + param); - System.out.println("*****************************"); - log.info("消费结束"); - - //确认消费 - channel.basicAck(deliveryTag, false); - - } else { - //重复消费 - log.error("重复消费"); - //拒绝消费 - channel.basicReject(deliveryTag, false); - - //删除缓存 - redis.opsForSet().remove("set:" + messageId, "set:" + messageId); - } - - - } catch (Exception e) { - log.error("消息没有成功消费!"); - - String s = redis.opsForValue().get("value:" + messageId); - - long oldTag = Long.parseLong(s); - - if (deliveryTag == (oldTag + 2)) { - log.error("确实消费不了,不入队了!"); - - - //拒绝消费 - channel.basicNack(deliveryTag, false, false); - } else { - log.info("消息消费失败,重新入队"); - //重新入队 - channel.basicNack(deliveryTag, false, true); - } - } - } -} +//package com.couplet.mq.service; +// +//import com.couplet.mq.domain.User; +//import com.rabbitmq.client.Channel; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.rabbit.annotation.RabbitHandler; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.StringRedisTemplate; +//import org.springframework.stereotype.Component; +// +//import java.io.IOException; +//import java.util.concurrent.TimeUnit; +// +///** +// * @ProjectName: five-groups-couplet +// * @Author: LiuYunHu +// * @CreateTime: 2024/3/28 +// * @Description: MQ消费者类 +// */ +// +//@Component +//@Slf4j +//@SuppressWarnings("all") +//@RabbitListener(queues = "${mq.queueName}") +//public class Consumer { +// @Autowired +// private StringRedisTemplate redis; +// +// /* 线程池执行 +// +// //创建一个定长线程池 +// private final Executor executor = Executors.newFixedThreadPool(5); +// +// @Async +// @RabbitHandler +// public void process(User param, Channel channel, Message message) { +// executor.execute(() -> { +// try { +// handleMessage(param, channel, message); +// } catch (IOException e) { +// log.error("处理消息失败:{}", e); +// } +// }); +// } +// +// //处理信息的方法 +// private void handleMessage(User param, Channel channel, Message message) throws IOException { +// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); +// +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// String messageId = message.getMessageProperties().getMessageId(); +// +// if (!redis.hasKey("value:" + messageId)) { +// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); +// } +// +// // 1 添加成功新数据 0已有重复值,不允许再添加 +// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); +// //过期时间 +// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); +// +// +// try { +// if (add == 1) { +// //第一次 消费 +// System.out.println("*****************************"); +// System.out.println("消费者收到消息:" + param); +// System.out.println("*****************************"); +// log.info("消费结束"); +// +// channel.basicAck(deliveryTag, false); +// +// } else { +// //重复消费 +// log.error("重复消费"); +// channel.basicReject(deliveryTag, false); +// +// //删除缓存 +// redis.opsForSet().remove("set:" + messageId, "set:" + messageId); +// } +// +// +// } catch (Exception e) { +// log.error("消息没有成功消费!"); +// +// String s = redis.opsForValue().get("value:" + messageId); +// +// long oldTag = Long.parseLong(s); +// +// if (deliveryTag == (oldTag + 2)) { +// log.error("确实消费不了,不入队了!"); +// channel.basicNack(deliveryTag, false, false); +// } else { +// log.info("消息消费失败,重新入队"); +// channel.basicNack(deliveryTag, false, true); +// } +// } +// +// } +// +//**/ +// +// @RabbitHandler +// public void process(User param, Channel channel, Message message) throws IOException { +// log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); +// +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// String messageId = message.getMessageProperties().getMessageId(); +// +// if (!redis.hasKey("value:" + messageId)) { +// redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); +// } +// +// // 1 添加成功新数据 0已有重复值,不允许再添加 +// Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); +// //过期时间 +// redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); +// +// +// try { +// if (add == 1) { +// //第一次 消费 +// System.out.println("*****************************"); +// System.out.println("消费者收到消息:" + param); +// System.out.println("*****************************"); +// log.info("消费结束"); +// +// //确认消费 +// channel.basicAck(deliveryTag, false); +// +// } else { +// //重复消费 +// log.error("重复消费"); +// //拒绝消费 +// channel.basicReject(deliveryTag, false); +// +// //删除缓存 +// redis.opsForSet().remove("set:" + messageId, "set:" + messageId); +// } +// +// +// } catch (Exception e) { +// log.error("消息没有成功消费!"); +// +// String s = redis.opsForValue().get("value:" + messageId); +// +// long oldTag = Long.parseLong(s); +// +// if (deliveryTag == (oldTag + 2)) { +// log.error("确实消费不了,不入队了!"); +// +// +// //拒绝消费 +// channel.basicNack(deliveryTag, false, false); +// } else { +// log.info("消息消费失败,重新入队"); +// //重新入队 +// channel.basicNack(deliveryTag, false, true); +// } +// } +// } +//}