feat: 优化Rabbit生产消费

dev
面包骑士 2024-10-09 18:59:59 +08:00
parent 465d39a4bf
commit 4f9c9ecd65
4 changed files with 184 additions and 81 deletions

View File

@ -1,5 +1,6 @@
package com.muyu.cargateway.service.Impl; package com.muyu.cargateway.service.Impl;
import com.muyu.cargateway.config.RabbitmqConfig;
import com.muyu.cargateway.domain.VehicleConnection; import com.muyu.cargateway.domain.VehicleConnection;
import com.muyu.cargateway.domain.VinIp; import com.muyu.cargateway.domain.VinIp;
import com.muyu.cargateway.domain.model.MqttServerModel; import com.muyu.cargateway.domain.model.MqttServerModel;
@ -44,7 +45,7 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
log.info("车辆连接请求:{}",vehicleConnectionReq.toString()); log.info("车辆连接请求:{}",vehicleConnectionReq.toString());
// 使用交换机发送消息 // 使用交换机发送消息
rabbitTemplate.convertAndSend("exchange_topics_inform","inform.#.email.#",vehicleConnectionReq.getVehicleVin()); rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,RabbitmqConfig.ROUTINGKEY_EMAIL,vehicleConnectionReq.getVehicleVin());
log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin()); log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin());

View File

@ -0,0 +1,100 @@
package com.muyu.data.processing.config;
import lombok.extern.log4j.Log4j2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Lenovo
* @ ToolIntelliJ IDEA
* @ AuthorCHX
* @ Date2024-10-04-15:13
* @ Version1.0
* @ Descriptionrabbitmq
*/
@Log4j2
@Configuration
public class RabbitmqConfig {
// 日志
private static final Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);
/**
*
*/
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
/**
*
*/
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
/**
*
*/
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
/**
* key
*/
public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
/**
* key
*/
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
/**
* ,
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange exchangeTopicsInform() {
try {
Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
log.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM);
return exchange;
} catch (Exception e) {
log.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e);
throw e;
}
}
// 声明QUEUE_INFORM_EMAIL队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue queueInformEmail() {
try {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
log.info("创建的队列为: {}", QUEUE_INFORM_EMAIL);
return queue;
} catch (Exception e) {
log.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e);
throw e;
}
}
// 声明QUEUE_INFORM_SMS队列
@Bean(QUEUE_INFORM_SMS)
public Queue queueInformSms() {
try {
Queue queue = new Queue(QUEUE_INFORM_SMS);
log.info("创建的队列为: {}", QUEUE_INFORM_SMS);
return queue;
} catch (Exception e) {
log.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e);
throw e;
}
}
//ROUTINGKEY_EMAIL队列绑定交换机指定routingKey
@Bean
public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机指定routingKey
@Bean
public Binding bindingRoutingKeySms(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}

View File

@ -1,20 +1,15 @@
package com.muyu.data.processing.controller; package com.muyu.data.processing.controller;
import com.muyu.common.core.domain.Result; import com.muyu.data.processing.config.RabbitmqConfig;
import com.muyu.common.security.utils.SecurityUtils;
import com.muyu.data.processing.domain.BasicData;
import com.muyu.data.processing.domain.IotDbData;
import com.muyu.data.processing.service.DataProcessingService; import com.muyu.data.processing.service.DataProcessingService;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
/** /**
* *
* *
@ -31,5 +26,12 @@ import java.util.List;
public class DataProcessingController { public class DataProcessingController {
@Resource @Resource
private DataProcessingService service; private DataProcessingService service;
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/goOnline")
public void goOnline(@RequestParam("vin") String vin) {
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", vin);
log.info("发送消息成功:{}",vin);
}
} }

View File

@ -1,72 +1,72 @@
//package com.muyu.data.processing.rebbit; package com.muyu.data.processing.rebbit;
//
//
//import com.muyu.common.caffeine.enums.CacheNameEnums; import com.muyu.common.caffeine.enums.CacheNameEnums;
//import com.muyu.common.rabbit.constants.RabbitConstants; import com.muyu.common.rabbit.constants.RabbitConstants;
//import com.rabbitmq.client.Channel; import com.muyu.data.processing.config.RabbitmqConfig;
//import jakarta.annotation.Resource; import com.rabbitmq.client.Channel;
//import lombok.Setter; import jakarta.annotation.Resource;
//import lombok.extern.slf4j.Slf4j; import lombok.Setter;
//import org.springframework.amqp.core.Message; import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.core.Message;
//import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.Queue;
//import org.springframework.cache.CacheManager; import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.data.redis.core.RedisTemplate; import org.springframework.cache.CacheManager;
//import org.springframework.stereotype.Component; import org.springframework.data.redis.core.RedisTemplate;
// import org.springframework.stereotype.Component;
//import java.io.IOException;
// import java.io.IOException;
///**
// * 上线事件监听 /**
// * @Author: 胡杨 * 线
// * @Name: GoOnlineRabbitConsumer * @Author:
// * @Description: 上线事件 * @Name: GoOnlineRabbitConsumer
// * @CreatedDate: 2024/9/26 下午7:38 * @Description: 线
// * @FilePath: com.muyu.data.processing.rebbit * @CreatedDate: 2024/9/26 7:38
// */ * @FilePath: com.muyu.data.processing.rebbit
//@Slf4j */
//@Component @Slf4j
//@Setter @Component
//public class GoOnlineRabbitConsumer { public class GoOnlineRabbitConsumer {
// @Resource @Resource
// private RedisTemplate<String,String> redisTemplate; private RedisTemplate<String,String> redisTemplate;
// @Resource @Resource
// private CacheManager cacheManager; private CacheManager cacheManager;
//
// @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)}) @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
// public void goOnline(String vin, Message message, Channel channel){ public void goOnline(String vin, Message message, Channel channel){
// log.info("车辆 {} 上线, 配置信息准备中。。。",vin); log.info("车辆 {} 上线, 配置信息准备中。。。",vin);
// try { try {
// // 重复性校验 // 重复性校验
// Long add = redisTemplate.opsForSet().add(RabbitConstants.GO_ONLINE_QUEUE, message.getMessageProperties().getMessageId()); Long add = redisTemplate.opsForSet().add(RabbitConstants.GO_ONLINE_QUEUE, message.getMessageProperties().getMessageId());
// if (add>0) { if (add>0) {
// addCarCache(vin); addCarCache(vin);
// log.info("车辆 {} 上线, 消息已确认。。。",vin); log.info("车辆 {} 上线, 消息已确认。。。",vin);
// } else { } else {
// log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin); log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin);
// } }
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin); log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin);
// } catch (IOException e) { } catch (IOException e) {
// try { try {
// log.warn("车辆 {} 上线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage()); log.warn("车辆 {} 上线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage());
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
// } catch (IOException ex) { } catch (IOException ex) {
// log.warn("车辆 {} 上线, 消息返回队列失败,原因:{}", vin, ex.getMessage()); log.warn("车辆 {} 上线, 消息返回队列失败,原因:{}", vin, ex.getMessage());
// } }
// } }
// } }
//
// /** /**
// * 车辆上线 - 新增缓存 * 线 -
// */ */
// public void addCarCache(String vin) { public void addCarCache(String vin) {
// // 从Redis中获取缓存信息 // 从Redis中获取缓存信息
// for (String name : CacheNameEnums.getCodes()) { for (String name : CacheNameEnums.getCodes()) {
// String value = redisTemplate.opsForValue().get(name+":"+vin); String value = redisTemplate.opsForValue().get(name+":"+vin);
// cacheManager.getCache(name).put(vin, value); cacheManager.getCache(name).put(vin, value);
// log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value); log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value);
// } }
// log.info("车辆编码:{},本地缓存完成...",vin); log.info("车辆编码:{},本地缓存完成...",vin);
// } }
//} }