feat(): 新增Rabbit队列,重启车辆下线监听功能
parent
c06a41d9e5
commit
f797c7cae8
|
@ -30,6 +30,10 @@ public class RabbitmqConfig {
|
||||||
* 队列
|
* 队列
|
||||||
*/
|
*/
|
||||||
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
|
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
|
||||||
|
/**
|
||||||
|
* 队列
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_SEND = "queue_inform_send";
|
||||||
/**
|
/**
|
||||||
* 交换机
|
* 交换机
|
||||||
*/
|
*/
|
||||||
|
@ -42,6 +46,10 @@ public class RabbitmqConfig {
|
||||||
* 路由key
|
* 路由key
|
||||||
*/
|
*/
|
||||||
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
|
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
|
||||||
|
/**
|
||||||
|
* 路由key
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_SEND = "inform.#.send.#";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 声明交换机,做持久化
|
* 声明交换机,做持久化
|
||||||
|
@ -84,6 +92,20 @@ public class RabbitmqConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// 声明QUEUE_INFORM_SEND队列
|
||||||
|
@Bean(QUEUE_INFORM_SEND)
|
||||||
|
public Queue queueInformSend() {
|
||||||
|
try {
|
||||||
|
Queue queue = new Queue(QUEUE_INFORM_SEND);
|
||||||
|
log.info("创建的队列为: {}", QUEUE_INFORM_SEND);
|
||||||
|
return queue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_SEND, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
|
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
|
||||||
@Bean
|
@Bean
|
||||||
public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
|
public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
|
||||||
|
@ -97,4 +119,11 @@ public class RabbitmqConfig {
|
||||||
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//ROUTINGKEY_SEND队列绑定交换机,指定routingKey
|
||||||
|
@Bean
|
||||||
|
public Binding bindingRoutingKeySend(@Qualifier(QUEUE_INFORM_SEND) Queue queue,
|
||||||
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SEND).noargs();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,10 +30,24 @@ public class DataProcessingController {
|
||||||
@Resource
|
@Resource
|
||||||
private RabbitTemplate rabbitTemplate;
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 车辆上线测试
|
||||||
|
* @param vin 车辆VIN码
|
||||||
|
*/
|
||||||
@GetMapping("/goOnline")
|
@GetMapping("/goOnline")
|
||||||
public void goOnline(@RequestParam("vin") String vin) {
|
public void goOnline(@RequestParam("vin") String vin) {
|
||||||
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", vin);
|
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", vin);
|
||||||
log.info("发送消息成功:{}",vin);
|
log.info("车辆 {} 上线测试消息发送成功...",vin);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 车辆下线测试
|
||||||
|
* @param vin 车辆VIN码
|
||||||
|
*/
|
||||||
|
@GetMapping("/downline")
|
||||||
|
public void downline(@RequestParam("vin") String vin) {
|
||||||
|
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.send", vin);
|
||||||
|
log.info("车辆 {} 下线测试消息发送成功...",vin);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,72 +1,64 @@
|
||||||
//package com.muyu.data.processing.rebbit;
|
package com.muyu.data.processing.rebbit;
|
||||||
//
|
|
||||||
//
|
|
||||||
//import com.muyu.common.rabbit.constants.RabbitConstants;
|
import com.muyu.common.caffeine.utils.CacheUtils;
|
||||||
//import com.rabbitmq.client.Channel;
|
import com.muyu.common.rabbit.config.RabbitmqConfig;
|
||||||
//import jakarta.annotation.Resource;
|
import com.muyu.common.rabbit.constants.RabbitConstants;
|
||||||
//import lombok.Setter;
|
import com.rabbitmq.client.Channel;
|
||||||
//import lombok.extern.slf4j.Slf4j;
|
import jakarta.annotation.Resource;
|
||||||
//import org.apache.commons.lang3.ObjectUtils;
|
import lombok.Setter;
|
||||||
//import org.springframework.amqp.core.Message;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
//import org.springframework.amqp.rabbit.annotation.Queue;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.core.Message;
|
||||||
//import org.springframework.cache.Cache;
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
//import org.springframework.cache.CacheManager;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
//import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.cache.Cache;
|
||||||
//import org.springframework.stereotype.Component;
|
import org.springframework.cache.CacheManager;
|
||||||
//
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
//import java.io.IOException;
|
import org.springframework.stereotype.Component;
|
||||||
//
|
|
||||||
///**
|
import java.io.IOException;
|
||||||
// * 下线事件监听
|
|
||||||
// * @Author: 胡杨
|
/**
|
||||||
// * @Name: DownlineRabbitConsumer
|
* 下线事件监听
|
||||||
// * @Description: 车辆下线监听器
|
* @Author: 胡杨
|
||||||
// * @CreatedDate: 2024/9/26 下午8:21
|
* @Name: DownlineRabbitConsumer
|
||||||
// * @FilePath: com.muyu.data.processing.rebbit
|
* @Description: 车辆下线监听器
|
||||||
// */
|
* @CreatedDate: 2024/9/26 下午8:21
|
||||||
//@Slf4j
|
* @FilePath: com.muyu.data.processing.rebbit
|
||||||
//@Component
|
*/
|
||||||
//@Setter
|
@Slf4j
|
||||||
//public class DownlineRabbitConsumer {
|
@Component
|
||||||
// @Resource
|
@Setter
|
||||||
// private RedisTemplate<String,String> redisTemplate;
|
public class DownlineRabbitConsumer {
|
||||||
// @Resource
|
@Resource
|
||||||
// private CacheManager cacheManager;
|
private RedisTemplate<String,String> redisTemplate;
|
||||||
//
|
@Resource
|
||||||
// @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.DOWNLINE_QUEUE)})
|
private CacheUtils cacheUtils;
|
||||||
// public void downline(String vin, Message message, Channel channel) {
|
|
||||||
// log.info("车辆 {} 下线, 配置信息准备中。。。",vin);
|
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SEND})
|
||||||
// try {
|
public void downline(String vin, Message message, Channel channel) {
|
||||||
// // 重复性校验
|
log.info("车辆 {} 下线, 配置信息准备中。。。",vin);
|
||||||
// Long add = redisTemplate.opsForSet().add(RabbitConstants.DOWNLINE_QUEUE, message.getMessageProperties().getMessageId());
|
try {
|
||||||
// if (add>0) {
|
deleteCarCache(vin);
|
||||||
// deleteCarCache(vin);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
// log.info("车辆 {} 下线, 消息已确认。。。",vin);
|
log.info("车辆 {} 下线, 配置信息已准备完毕。。。",vin);
|
||||||
// } else {
|
} catch (IOException e) {
|
||||||
// log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin);
|
try {
|
||||||
// }
|
log.warn("车辆 {} 下线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage());
|
||||||
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
// log.info("车辆 {} 下线, 配置信息已准备完毕。。。",vin);
|
} catch (IOException ex) {
|
||||||
// } catch (IOException e) {
|
log.warn("车辆 {} 下线, 消息返回队列失败,原因:{}", vin, ex.getMessage());
|
||||||
// try {
|
}
|
||||||
// log.warn("车辆 {} 下线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage());
|
}
|
||||||
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
|
}
|
||||||
// } catch (IOException ex) {
|
|
||||||
// log.warn("车辆 {} 下线, 消息返回队列失败,原因:{}", vin, ex.getMessage());
|
|
||||||
// }
|
/**
|
||||||
// }
|
* 车辆下线 - 删除缓存
|
||||||
// }
|
*/
|
||||||
//
|
public void deleteCarCache(String vin) {
|
||||||
//
|
cacheUtils.delCacheValueAll(vin);
|
||||||
// /**
|
log.info("车辆编码:{},本地缓存删除完成...", vin);
|
||||||
// * 车辆下线 - 删除缓存
|
}
|
||||||
// */
|
}
|
||||||
// public void deleteCarCache(String vin) {
|
|
||||||
// Cache cache = cacheManager.getCache(vin);
|
|
||||||
// if (ObjectUtils.isNotEmpty(cache)){
|
|
||||||
// cache.invalidate();
|
|
||||||
// }
|
|
||||||
// log.info("车辆编码:{},本地缓存删除完成...", vin);
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
|
@ -36,7 +36,6 @@ public class GoOnlineRabbitConsumer {
|
||||||
log.info("车辆 {} 上线, 配置信息准备中。。。",vin);
|
log.info("车辆 {} 上线, 配置信息准备中。。。",vin);
|
||||||
try {
|
try {
|
||||||
addCarCache(vin);
|
addCarCache(vin);
|
||||||
log.info("车辆 {} 上线, 消息已确认。。。",vin);
|
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin);
|
log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
Loading…
Reference in New Issue