diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java index 83b1028..3cd5bf6 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventCustom.java @@ -1,6 +1,6 @@ package com.muyu.processing.basic; -import cn.hutool.json.JSONObject; +import com.alibaba.fastjson.JSONObject; import org.springframework.context.ApplicationEvent; /** diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java index 2551004..a542b6e 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/basic/EventPublisher.java @@ -1,7 +1,9 @@ package com.muyu.processing.basic; +import com.alibaba.fastjson.JSONObject; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.stereotype.Component; /** * 策略发送事件 @@ -11,6 +13,7 @@ import org.springframework.context.ApplicationEventPublisherAware; * @name:EventPublisher * @Date:2024/9/29 22:31 */ +@Component public class EventPublisher implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; @@ -20,4 +23,13 @@ public class EventPublisher implements ApplicationEventPublisherAware { this.publisher = applicationEventPublisher; } + /** + * 发送事件 + * @param jsonObject 数据 + */ + public void eventPublish(JSONObject jsonObject){ + EventCustom eventCustom = new EventCustom(this, jsonObject); + publisher.publishEvent(eventCustom); + } + } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java deleted file mode 100644 index 4b18ab2..0000000 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.muyu.processing.config; - -import org.springframework.amqp.rabbit.connection.CorrelationData; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -/** - * 确认回调配置 - */ -@Component -public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { - - @Autowired - private RabbitTemplate rabbitTemplate; - - /** - * 当前bean初始化的时候执行 - */ - @PostConstruct - public void init() { - this.rabbitTemplate.setConfirmCallback(this); - } - - /** - * 确认方法 - * @param correlationData correlation data for the callback. - * @param ack true for ack, false for nack - * @param cause An optional cause, for nack, when available, otherwise null. - */ - @Override - public void confirm(CorrelationData correlationData, boolean ack, String cause) { - if (ack) { - System.out.println("消息发送到 broker 成功"); - } else { - System.out.println("消息发送到 broker 失败,失败的原因:" + cause); - } - } - -} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java deleted file mode 100644 index 646886b..0000000 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.muyu.processing.config; - -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitAdmin; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * 构建 RabbitAdmin - */ -@Configuration -public class RabbitAdminConfig { - - @Value("${spring.rabbitmq.host}") - private String host; - @Value("${spring.rabbitmq.username}") - private String username; - @Value("${spring.rabbitmq.password}") - private String password; - @Value("${spring.rabbitmq.virtual-host}") - private String virtualhost; - - @Bean - public ConnectionFactory connectionFactory() { - CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); - connectionFactory.setAddresses(host); - connectionFactory.setUsername(username); - connectionFactory.setPassword(password); - connectionFactory.setVirtualHost(virtualhost); - // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 - connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); - connectionFactory.setPublisherReturns(true); - return connectionFactory; - } - - - /** - * rabbitAdmin - * @param connectionFactory - * @return - */ - @Bean - public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { - RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); - rabbitAdmin.setAutoStartup(true); - return rabbitAdmin; - } -} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java deleted file mode 100644 index 4ae5a51..0000000 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.muyu.processing.config; - -import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; -import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * 消息转换配置 - */ -@Configuration -public class RabbitmqConfig { - // 消息转换配置 - @Bean - public MessageConverter jsonMessageConverter(){ - return new Jackson2JsonMessageConverter(); - } -} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java deleted file mode 100644 index 32ddbdd..0000000 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.muyu.processing.config; - -import org.springframework.amqp.core.ReturnedMessage; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -/** - * 消息发送失败返回配置 - */ -@Component -public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback { - - @Autowired - private RabbitTemplate rabbitTemplate; - - /** - * 当前bean初始化的时候执行 - */ - @PostConstruct - public void init() { - this.rabbitTemplate.setReturnsCallback(this); - } - - /** - * 消息发送达到 queue 失败执行 - * - * @param returnedMessage the returned message and metadata. - */ - @Override - public void returnedMessage(ReturnedMessage returnedMessage) { - System.out.println("消息" + returnedMessage.getMessage().toString() + - "被交换机" + returnedMessage.getExchange() + "回退!" - + "退回原因为:" + returnedMessage.getReplyText()); - // TODO 回退了所有的信息,可做补偿机制 - } -} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index 56a2ec5..cd45c29 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -9,6 +9,7 @@ import com.muyu.domain.Vehicle; import com.muyu.domain.WarnRule; import com.muyu.domain.WarnStrategy; import com.muyu.domain.resp.VehicleManageResp; +import com.muyu.processing.basic.EventPublisher; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -34,24 +35,26 @@ import java.util.Map; @Component public class KafkaConsumerService implements InitializingBean { + private static final String TIPSY = "tipsy"; + @Resource private KafkaConsumer kafkaConsumer; @Resource private CacheUtil cacheUtil; -// @Resource -// private EventInterface eventInterface; + @Resource + private EventPublisher eventPublisher; @Override public void afterPropertiesSet() throws Exception { Thread thread = new Thread(() -> { - log.info("启动线程监听Topic: {}", "zeshi"); + log.info("启动线程监听Topic: {}", TIPSY); // 延迟1秒 ThreadUtil.sleep(1000); // 订阅主题 - Collection topics = Lists.newArrayList("zeshi"); + Collection topics = Lists.newArrayList(TIPSY); kafkaConsumer.subscribe(topics); while (true) { // 轮询消费消息 @@ -75,7 +78,7 @@ public class KafkaConsumerService implements InitializingBean { WarnRule warnRule = (WarnRule) map.get("warnRule"); WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp"); -// eventInterface.handle(jsonObject); + eventPublisher.eventPublish(jsonObject); } catch (Exception e) { // 捕获异常 log.info("这个有问题:{}",e.getMessage()); diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java index c5f6f48..7ed838f 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java @@ -3,9 +3,9 @@ package com.muyu.processing.consumer; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.core.Message; -import com.rabbitmq.client.Channel; +import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @@ -23,6 +23,15 @@ import javax.annotation.Resource; @Component public class OfflineMonitoringConsumer { + /** + * 下线监听队列名称 + */ + private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING"; + /** + * 下线监听交换机名称 + */ + private static final String OFFLINE_EXCHANGE = "OFFLINE_EXCHANGE"; + @Resource private CacheUtil cacheUtil; @@ -30,22 +39,14 @@ public class OfflineMonitoringConsumer { * 接收消息 * @param vin 车辆vin */ - @RabbitListener(queuesToDeclare = @Queue("offline_monitoring")) - public void receive(String vin, Message message, Channel channel){ - try { - log.info("清除缓存中的数据,车辆vin: {}", vin); - // 清除缓存 - cacheUtil.remove(vin); - log.info("vin码为: {}, 的本地缓存清除成功",vin); - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); - } catch (Exception e) { - try { - channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); - } catch (Exception ex) { - log.info("清除本地缓存异常为: {}",e.getMessage()); - } - } - + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = OFFLINE_MONITORING, declare = "true"), + exchange = @Exchange(value = OFFLINE_EXCHANGE, type = "fanout"))) + public void online(String vin){ + log.info("清除缓存中的数据,车辆vin: {}", vin); + // 清除缓存 + cacheUtil.remove(vin); + log.info("vin码为: {}de本地缓存清除成功",vin); } } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java index 43230fe..b6ebafb 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java @@ -7,10 +7,10 @@ import com.muyu.domain.WarnStrategy; import com.muyu.domain.resp.VehicleManageResp; import com.muyu.enterprise.cache.*; import com.muyu.processing.utils.CacheUtil; -import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @@ -29,6 +29,15 @@ import java.util.HashMap; @Component public class OnLineMonitoringConsumer { + /** + * 上线监听队列名称 + */ + private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING"; + /** + * 上线监听交换机名称 + */ + private static final String ONLINE_EXCHANGE = "ONLINE_EXCHANGE"; + @Resource private CacheUtil cacheUtil; @@ -53,40 +62,30 @@ public class OnLineMonitoringConsumer { /** * 上线监听车辆网关中车辆上线时 */ - @RabbitListener(queuesToDeclare = @Queue("long_time_no_see")) - public void receive(String vin, Message message, Channel channel){ - - try { - log.info("添加本地缓存,车辆vin: {}", vin); - // 获取redis中的数据 - Fence fence = fenceCahceService.get(vin); - Object breakdown = faultCacheService.get(vin); - Vehicle vehicle = vehicleCacheService.get(vin); - WarnRule warnRule = warnRuleCacheService.get(vin); - WarnStrategy warnStrategy = warnStrategyCacheService.get(vin); - VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin); - // 封装从redis中获得的数据 - HashMap map = new HashMap<>(); - map.put("fence",fence); - map.put("breakdown",breakdown); - map.put("vehicle",vehicle); - map.put("warnRule",warnRule); - map.put("warnStrategy",warnStrategy); - map.put("vehicleManageResp",vehicleManageResp); - // 添加到本地缓存中 - cacheUtil.put(vin,map); - log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map); - // 手动确认消息发送成功 - channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); - } catch (Exception e) { - try { - // 手动拒绝消息 - channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = ON_LINE_MONITORING, declare = "true"), + exchange = @Exchange(value = ONLINE_EXCHANGE, type = "fanout"))) + public void online(String vin){ + log.info("添加本地缓存,车辆vin: {}", vin); + // 获取redis中的数据 + Fence fence = fenceCahceService.get(vin); + Object breakdown = faultCacheService.get(vin); + Vehicle vehicle = vehicleCacheService.get(vin); + WarnRule warnRule = warnRuleCacheService.get(vin); + WarnStrategy warnStrategy = warnStrategyCacheService.get(vin); + VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin); + // 封装从redis中获得的数据 + HashMap map = new HashMap<>(); + map.put("fence",fence); + map.put("breakdown",breakdown); + map.put("vehicle",vehicle); + map.put("warnRule",warnRule); + map.put("warnStrategy",warnStrategy); + map.put("vehicleManageResp",vehicleManageResp); + // 添加到本地缓存中 + cacheUtil.put(vin,map); + log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map); } + } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java index 32bdd37..7477697 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java @@ -1,7 +1,6 @@ package com.muyu.processing.controller; import cn.hutool.json.JSONObject; -import com.muyu.common.core.utils.uuid.UUID; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -25,7 +24,17 @@ import javax.annotation.Resource; @RequestMapping("/kafka") public class TestKafka { + private static final String TIPSY = "tipsy"; + private static final String VIN = "1123wsdfr54323wsd"; + /** + * 上线监听队列名称 + */ + private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING"; + /** + * 下线监听队列名称 + */ + private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING"; @Resource private KafkaProducer kafkaProducer; @@ -34,43 +43,35 @@ public class TestKafka { /** * 发送Kafka消息 - * @return String */ @GetMapping("/send") public String sendMsg(){ JSONObject entries = new JSONObject(); - entries.set("vin","1123wsdfr54323wsd"); + entries.set("vin",VIN); entries.set("name","宝马"); String entriesString = entries.toString(); - ProducerRecord producerRecord = new ProducerRecord<>("zeshi", entriesString); + ProducerRecord producerRecord = new ProducerRecord<>(TIPSY, entriesString); kafkaProducer.send(producerRecord); return "OK"; } /** - * 发送MQ消息 - * @return String + * 上线监听测试 */ @GetMapping("/sendMQ") public String sendMQ(){ - rabbitTemplate.convertAndSend("long_time_no_see","1123wsdfr54323wsd",message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - }); + // 发送消息 + rabbitTemplate.convertAndSend(ON_LINE_MONITORING,VIN); return "OK"; } /** - * 发送MQ队列消息 - * @return String + * 下线监听测试 */ @GetMapping("/sendDui") public String sedDui() { - rabbitTemplate.convertAndSend("offline_monitoring","1123wsdfr54323wsd",message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - }); -// rabbitTemplate.convertAndSend("myExchange","Im.fine",""); + // 发送消息 + rabbitTemplate.convertAndSend(OFFLINE_MONITORING,VIN); return "OK"; } } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java index 006d5f2..2dfe60b 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java @@ -1,6 +1,6 @@ package com.muyu.processing.listener; -import cn.hutool.json.JSONObject; +import com.alibaba.fastjson.JSONObject; import com.muyu.processing.basic.EventCustom; import com.muyu.processing.basic.EventListener;