diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java index 5792030..f4fd72c 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java @@ -1,5 +1,6 @@ package com.muyu.cargateway.service.Impl; +import com.muyu.cargateway.config.RabbitmqConfig; import com.muyu.cargateway.domain.VehicleConnection; import com.muyu.cargateway.domain.VinIp; import com.muyu.cargateway.domain.model.MqttServerModel; @@ -44,7 +45,7 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ log.info("车辆连接请求:{}",vehicleConnectionReq.toString()); // 使用交换机发送消息 - rabbitTemplate.convertAndSend("exchange_topics_inform","inform.#.email.#",vehicleConnectionReq.getVehicleVin()); + rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,RabbitmqConfig.ROUTINGKEY_EMAIL,vehicleConnectionReq.getVehicleVin()); log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin()); diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/config/RabbitmqConfig.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/config/RabbitmqConfig.java new file mode 100644 index 0000000..49c39a0 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/config/RabbitmqConfig.java @@ -0,0 +1,100 @@ +package com.muyu.data.processing.config; + +import lombok.extern.log4j.Log4j2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Lenovo + * @ Tool:IntelliJ IDEA + * @ Author:CHX + * @ Date:2024-10-04-15:13 + * @ Version:1.0 + * @ Description:rabbitmq配置类 + */ +@Log4j2 +@Configuration +public class RabbitmqConfig { + // 日志 + private static final Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class); + + /** + * 队列 + */ + public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; + /** + * 队列 + */ + public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; + /** + * 交换机 + */ + public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; + /** + * 路由key + */ + public static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; + /** + * 路由key + */ + public static final String ROUTINGKEY_SMS = "inform.#.sms.#"; + + /** + * 声明交换机,做持久化 + */ + @Bean(EXCHANGE_TOPICS_INFORM) + public Exchange exchangeTopicsInform() { + try { + Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); + log.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM); + return exchange; + } catch (Exception e) { + log.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e); + throw e; + } + } + + // 声明QUEUE_INFORM_EMAIL队列 + @Bean(QUEUE_INFORM_EMAIL) + public Queue queueInformEmail() { + try { + Queue queue = new Queue(QUEUE_INFORM_EMAIL); + log.info("创建的队列为: {}", QUEUE_INFORM_EMAIL); + return queue; + } catch (Exception e) { + log.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e); + throw e; + } + } + + // 声明QUEUE_INFORM_SMS队列 + @Bean(QUEUE_INFORM_SMS) + public Queue queueInformSms() { + try { + Queue queue = new Queue(QUEUE_INFORM_SMS); + log.info("创建的队列为: {}", QUEUE_INFORM_SMS); + return queue; + } catch (Exception e) { + log.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e); + throw e; + } + } + + //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey + @Bean + public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); + } + + //ROUTINGKEY_SMS队列绑定交换机,指定routingKey + @Bean + public Binding bindingRoutingKeySms(@Qualifier(QUEUE_INFORM_SMS) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java index d44f2ba..b6e8c13 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java @@ -1,20 +1,15 @@ package com.muyu.data.processing.controller; -import com.muyu.common.core.domain.Result; -import com.muyu.common.security.utils.SecurityUtils; -import com.muyu.data.processing.domain.BasicData; -import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.config.RabbitmqConfig; import com.muyu.data.processing.service.DataProcessingService; import javax.annotation.Resource; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.*; import lombok.extern.slf4j.Slf4j; -import java.util.Date; -import java.util.HashMap; -import java.util.List; - /** * 数据处理控制层 * @@ -31,5 +26,12 @@ import java.util.List; public class DataProcessingController { @Resource private DataProcessingService service; + @Resource + private RabbitTemplate rabbitTemplate; + @GetMapping("/goOnline") + public void goOnline(@RequestParam("vin") String vin) { + rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", vin); + log.info("发送消息成功:{}",vin); + } } diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java index e74815e..f709139 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java @@ -1,72 +1,72 @@ -//package com.muyu.data.processing.rebbit; -// -// -//import com.muyu.common.caffeine.enums.CacheNameEnums; -//import com.muyu.common.rabbit.constants.RabbitConstants; -//import com.rabbitmq.client.Channel; -//import jakarta.annotation.Resource; -//import lombok.Setter; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.amqp.core.Message; -//import org.springframework.amqp.rabbit.annotation.Queue; -//import org.springframework.amqp.rabbit.annotation.RabbitListener; -//import org.springframework.cache.CacheManager; -//import org.springframework.data.redis.core.RedisTemplate; -//import org.springframework.stereotype.Component; -// -//import java.io.IOException; -// -///** -// * 上线事件监听 -// * @Author: 胡杨 -// * @Name: GoOnlineRabbitConsumer -// * @Description: 上线事件 -// * @CreatedDate: 2024/9/26 下午7:38 -// * @FilePath: com.muyu.data.processing.rebbit -// */ -//@Slf4j -//@Component -//@Setter -//public class GoOnlineRabbitConsumer { -// @Resource -// private RedisTemplate redisTemplate; -// @Resource -// private CacheManager cacheManager; -// -// @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)}) -// public void goOnline(String vin, Message message, Channel channel){ -// log.info("车辆 {} 上线, 配置信息准备中。。。",vin); -// try { -// // 重复性校验 -// Long add = redisTemplate.opsForSet().add(RabbitConstants.GO_ONLINE_QUEUE, message.getMessageProperties().getMessageId()); -// if (add>0) { -// addCarCache(vin); -// log.info("车辆 {} 上线, 消息已确认。。。",vin); -// } else { -// log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin); -// } -// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); -// log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin); -// } catch (IOException e) { -// try { -// log.warn("车辆 {} 上线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage()); -// channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); -// } catch (IOException ex) { -// log.warn("车辆 {} 上线, 消息返回队列失败,原因:{}", vin, ex.getMessage()); -// } -// } -// } -// -// /** -// * 车辆上线 - 新增缓存 -// */ -// public void addCarCache(String vin) { -// // 从Redis中获取缓存信息 -// for (String name : CacheNameEnums.getCodes()) { -// String value = redisTemplate.opsForValue().get(name+":"+vin); -// cacheManager.getCache(name).put(vin, value); -// log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value); -// } -// log.info("车辆编码:{},本地缓存完成...",vin); -// } -//} +package com.muyu.data.processing.rebbit; + + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.common.rabbit.constants.RabbitConstants; +import com.muyu.data.processing.config.RabbitmqConfig; +import com.rabbitmq.client.Channel; +import jakarta.annotation.Resource; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 上线事件监听 + * @Author: 胡杨 + * @Name: GoOnlineRabbitConsumer + * @Description: 上线事件 + * @CreatedDate: 2024/9/26 下午7:38 + * @FilePath: com.muyu.data.processing.rebbit + */ +@Slf4j +@Component +public class GoOnlineRabbitConsumer { + @Resource + private RedisTemplate redisTemplate; + @Resource + private CacheManager cacheManager; + + @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) + public void goOnline(String vin, Message message, Channel channel){ + log.info("车辆 {} 上线, 配置信息准备中。。。",vin); + try { + // 重复性校验 + Long add = redisTemplate.opsForSet().add(RabbitConstants.GO_ONLINE_QUEUE, message.getMessageProperties().getMessageId()); + if (add>0) { + addCarCache(vin); + log.info("车辆 {} 上线, 消息已确认。。。",vin); + } else { + log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin); + } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin); + } catch (IOException e) { + try { + log.warn("车辆 {} 上线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + } catch (IOException ex) { + log.warn("车辆 {} 上线, 消息返回队列失败,原因:{}", vin, ex.getMessage()); + } + } + } + + /** + * 车辆上线 - 新增缓存 + */ + public void addCarCache(String vin) { + // 从Redis中获取缓存信息 + for (String name : CacheNameEnums.getCodes()) { + String value = redisTemplate.opsForValue().get(name+":"+vin); + cacheManager.getCache(name).put(vin, value); + log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value); + } + log.info("车辆编码:{},本地缓存完成...",vin); + } +}