diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java index aed60f6..d529366 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitmqConfig.java @@ -30,6 +30,10 @@ public class RabbitmqConfig { * 队列 */ public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; + /** + * 队列 + */ + public static final String QUEUE_INFORM_SEND = "queue_inform_send"; /** * 交换机 */ @@ -42,6 +46,10 @@ public class RabbitmqConfig { * 路由key */ public static final String ROUTINGKEY_SMS = "inform.#.sms.#"; + /** + * 路由key + */ + public static final String ROUTINGKEY_SEND = "inform.#.send.#"; /** * 声明交换机,做持久化 @@ -84,6 +92,20 @@ public class RabbitmqConfig { } } + + // 声明QUEUE_INFORM_SEND队列 + @Bean(QUEUE_INFORM_SEND) + public Queue queueInformSend() { + try { + Queue queue = new Queue(QUEUE_INFORM_SEND); + log.info("创建的队列为: {}", QUEUE_INFORM_SEND); + return queue; + } catch (Exception e) { + log.error("创建该: {} 队列失败", QUEUE_INFORM_SEND, e); + throw e; + } + } + //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @@ -97,4 +119,11 @@ public class RabbitmqConfig { @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } + + //ROUTINGKEY_SEND队列绑定交换机,指定routingKey + @Bean + public Binding bindingRoutingKeySend(@Qualifier(QUEUE_INFORM_SEND) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SEND).noargs(); + } } diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java index 29c97c4..b2dd356 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java @@ -30,10 +30,24 @@ public class DataProcessingController { @Resource private RabbitTemplate rabbitTemplate; + /** + * 车辆上线测试 + * @param vin 车辆VIN码 + */ @GetMapping("/goOnline") public void goOnline(@RequestParam("vin") String vin) { rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", vin); - log.info("发送消息成功:{}",vin); + log.info("车辆 {} 上线测试消息发送成功...",vin); + } + + /** + * 车辆下线测试 + * @param vin 车辆VIN码 + */ + @GetMapping("/downline") + public void downline(@RequestParam("vin") String vin) { + rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.send", vin); + log.info("车辆 {} 下线测试消息发送成功...",vin); } /** diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java index 97a496b..77a101e 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java @@ -1,72 +1,64 @@ -//package com.muyu.data.processing.rebbit; -// -// -//import com.muyu.common.rabbit.constants.RabbitConstants; -//import com.rabbitmq.client.Channel; -//import jakarta.annotation.Resource; -//import lombok.Setter; -//import lombok.extern.slf4j.Slf4j; -//import org.apache.commons.lang3.ObjectUtils; -//import org.springframework.amqp.core.Message; -//import org.springframework.amqp.rabbit.annotation.Queue; -//import org.springframework.amqp.rabbit.annotation.RabbitListener; -//import org.springframework.cache.Cache; -//import org.springframework.cache.CacheManager; -//import org.springframework.data.redis.core.RedisTemplate; -//import org.springframework.stereotype.Component; -// -//import java.io.IOException; -// -///** -// * 下线事件监听 -// * @Author: 胡杨 -// * @Name: DownlineRabbitConsumer -// * @Description: 车辆下线监听器 -// * @CreatedDate: 2024/9/26 下午8:21 -// * @FilePath: com.muyu.data.processing.rebbit -// */ -//@Slf4j -//@Component -//@Setter -//public class DownlineRabbitConsumer { -// @Resource -// private RedisTemplate redisTemplate; -// @Resource -// private CacheManager cacheManager; -// -// @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.DOWNLINE_QUEUE)}) -// public void downline(String vin, Message message, Channel channel) { -// log.info("车辆 {} 下线, 配置信息准备中。。。",vin); -// try { -// // 重复性校验 -// Long add = redisTemplate.opsForSet().add(RabbitConstants.DOWNLINE_QUEUE, message.getMessageProperties().getMessageId()); -// if (add>0) { -// deleteCarCache(vin); -// log.info("车辆 {} 下线, 消息已确认。。。",vin); -// } else { -// log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin); -// } -// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); -// log.info("车辆 {} 下线, 配置信息已准备完毕。。。",vin); -// } catch (IOException e) { -// try { -// log.warn("车辆 {} 下线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage()); -// channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); -// } catch (IOException ex) { -// log.warn("车辆 {} 下线, 消息返回队列失败,原因:{}", vin, ex.getMessage()); -// } -// } -// } -// -// -// /** -// * 车辆下线 - 删除缓存 -// */ -// public void deleteCarCache(String vin) { -// Cache cache = cacheManager.getCache(vin); -// if (ObjectUtils.isNotEmpty(cache)){ -// cache.invalidate(); -// } -// log.info("车辆编码:{},本地缓存删除完成...", vin); -// } -//} +package com.muyu.data.processing.rebbit; + + +import com.muyu.common.caffeine.utils.CacheUtils; +import com.muyu.common.rabbit.config.RabbitmqConfig; +import com.muyu.common.rabbit.constants.RabbitConstants; +import com.rabbitmq.client.Channel; +import jakarta.annotation.Resource; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 下线事件监听 + * @Author: 胡杨 + * @Name: DownlineRabbitConsumer + * @Description: 车辆下线监听器 + * @CreatedDate: 2024/9/26 下午8:21 + * @FilePath: com.muyu.data.processing.rebbit + */ +@Slf4j +@Component +@Setter +public class DownlineRabbitConsumer { + @Resource + private RedisTemplate redisTemplate; + @Resource + private CacheUtils cacheUtils; + + @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SEND}) + public void downline(String vin, Message message, Channel channel) { + log.info("车辆 {} 下线, 配置信息准备中。。。",vin); + try { + deleteCarCache(vin); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("车辆 {} 下线, 配置信息已准备完毕。。。",vin); + } catch (IOException e) { + try { + log.warn("车辆 {} 下线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + } catch (IOException ex) { + log.warn("车辆 {} 下线, 消息返回队列失败,原因:{}", vin, ex.getMessage()); + } + } + } + + + /** + * 车辆下线 - 删除缓存 + */ + public void deleteCarCache(String vin) { + cacheUtils.delCacheValueAll(vin); + log.info("车辆编码:{},本地缓存删除完成...", vin); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java index 81a841d..490e774 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java @@ -36,7 +36,6 @@ public class GoOnlineRabbitConsumer { log.info("车辆 {} 上线, 配置信息准备中。。。",vin); try { addCarCache(vin); - log.info("车辆 {} 上线, 消息已确认。。。",vin); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin); } catch (IOException e) {