feat:() 优化上下线监听队列名称 和新增发送事件方法
parent
e2e943749e
commit
1e9c92e4d3
|
@ -1,6 +1,6 @@
|
||||||
package com.muyu.processing.basic;
|
package com.muyu.processing.basic;
|
||||||
|
|
||||||
import cn.hutool.json.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import org.springframework.context.ApplicationEvent;
|
import org.springframework.context.ApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
package com.muyu.processing.basic;
|
package com.muyu.processing.basic;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import org.springframework.context.ApplicationEventPublisher;
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
import org.springframework.context.ApplicationEventPublisherAware;
|
import org.springframework.context.ApplicationEventPublisherAware;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 策略发送事件
|
* 策略发送事件
|
||||||
|
@ -11,6 +13,7 @@ import org.springframework.context.ApplicationEventPublisherAware;
|
||||||
* @name:EventPublisher
|
* @name:EventPublisher
|
||||||
* @Date:2024/9/29 22:31
|
* @Date:2024/9/29 22:31
|
||||||
*/
|
*/
|
||||||
|
@Component
|
||||||
public class EventPublisher implements ApplicationEventPublisherAware {
|
public class EventPublisher implements ApplicationEventPublisherAware {
|
||||||
|
|
||||||
private ApplicationEventPublisher publisher;
|
private ApplicationEventPublisher publisher;
|
||||||
|
@ -20,4 +23,13 @@ public class EventPublisher implements ApplicationEventPublisherAware {
|
||||||
this.publisher = applicationEventPublisher;
|
this.publisher = applicationEventPublisher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送事件
|
||||||
|
* @param jsonObject 数据
|
||||||
|
*/
|
||||||
|
public void eventPublish(JSONObject jsonObject){
|
||||||
|
EventCustom eventCustom = new EventCustom(this, jsonObject);
|
||||||
|
publisher.publishEvent(eventCustom);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
package com.muyu.processing.config;
|
|
||||||
|
|
||||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 确认回调配置
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RabbitTemplate rabbitTemplate;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 当前bean初始化的时候执行
|
|
||||||
*/
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
this.rabbitTemplate.setConfirmCallback(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 确认方法
|
|
||||||
* @param correlationData correlation data for the callback.
|
|
||||||
* @param ack true for ack, false for nack
|
|
||||||
* @param cause An optional cause, for nack, when available, otherwise null.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
|
||||||
if (ack) {
|
|
||||||
System.out.println("消息发送到 broker 成功");
|
|
||||||
} else {
|
|
||||||
System.out.println("消息发送到 broker 失败,失败的原因:" + cause);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,50 +0,0 @@
|
||||||
package com.muyu.processing.config;
|
|
||||||
|
|
||||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 构建 RabbitAdmin
|
|
||||||
*/
|
|
||||||
@Configuration
|
|
||||||
public class RabbitAdminConfig {
|
|
||||||
|
|
||||||
@Value("${spring.rabbitmq.host}")
|
|
||||||
private String host;
|
|
||||||
@Value("${spring.rabbitmq.username}")
|
|
||||||
private String username;
|
|
||||||
@Value("${spring.rabbitmq.password}")
|
|
||||||
private String password;
|
|
||||||
@Value("${spring.rabbitmq.virtual-host}")
|
|
||||||
private String virtualhost;
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ConnectionFactory connectionFactory() {
|
|
||||||
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
|
|
||||||
connectionFactory.setAddresses(host);
|
|
||||||
connectionFactory.setUsername(username);
|
|
||||||
connectionFactory.setPassword(password);
|
|
||||||
connectionFactory.setVirtualHost(virtualhost);
|
|
||||||
// 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
|
|
||||||
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
|
|
||||||
connectionFactory.setPublisherReturns(true);
|
|
||||||
return connectionFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* rabbitAdmin
|
|
||||||
* @param connectionFactory
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
@Bean
|
|
||||||
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
|
|
||||||
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
|
|
||||||
rabbitAdmin.setAutoStartup(true);
|
|
||||||
return rabbitAdmin;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,18 +0,0 @@
|
||||||
package com.muyu.processing.config;
|
|
||||||
|
|
||||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
|
||||||
import org.springframework.amqp.support.converter.MessageConverter;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息转换配置
|
|
||||||
*/
|
|
||||||
@Configuration
|
|
||||||
public class RabbitmqConfig {
|
|
||||||
// 消息转换配置
|
|
||||||
@Bean
|
|
||||||
public MessageConverter jsonMessageConverter(){
|
|
||||||
return new Jackson2JsonMessageConverter();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,39 +0,0 @@
|
||||||
package com.muyu.processing.config;
|
|
||||||
|
|
||||||
import org.springframework.amqp.core.ReturnedMessage;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息发送失败返回配置
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RabbitTemplate rabbitTemplate;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 当前bean初始化的时候执行
|
|
||||||
*/
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
this.rabbitTemplate.setReturnsCallback(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息发送达到 queue 失败执行
|
|
||||||
*
|
|
||||||
* @param returnedMessage the returned message and metadata.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void returnedMessage(ReturnedMessage returnedMessage) {
|
|
||||||
System.out.println("消息" + returnedMessage.getMessage().toString() +
|
|
||||||
"被交换机" + returnedMessage.getExchange() + "回退!"
|
|
||||||
+ "退回原因为:" + returnedMessage.getReplyText());
|
|
||||||
// TODO 回退了所有的信息,可做补偿机制
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -9,6 +9,7 @@ import com.muyu.domain.Vehicle;
|
||||||
import com.muyu.domain.WarnRule;
|
import com.muyu.domain.WarnRule;
|
||||||
import com.muyu.domain.WarnStrategy;
|
import com.muyu.domain.WarnStrategy;
|
||||||
import com.muyu.domain.resp.VehicleManageResp;
|
import com.muyu.domain.resp.VehicleManageResp;
|
||||||
|
import com.muyu.processing.basic.EventPublisher;
|
||||||
import com.muyu.processing.utils.CacheUtil;
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
@ -34,24 +35,26 @@ import java.util.Map;
|
||||||
@Component
|
@Component
|
||||||
public class KafkaConsumerService implements InitializingBean {
|
public class KafkaConsumerService implements InitializingBean {
|
||||||
|
|
||||||
|
private static final String TIPSY = "tipsy";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private KafkaConsumer kafkaConsumer;
|
private KafkaConsumer kafkaConsumer;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private CacheUtil cacheUtil;
|
private CacheUtil cacheUtil;
|
||||||
|
|
||||||
// @Resource
|
@Resource
|
||||||
// private EventInterface eventInterface;
|
private EventPublisher eventPublisher;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterPropertiesSet() throws Exception {
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
|
||||||
Thread thread = new Thread(() -> {
|
Thread thread = new Thread(() -> {
|
||||||
log.info("启动线程监听Topic: {}", "zeshi");
|
log.info("启动线程监听Topic: {}", TIPSY);
|
||||||
// 延迟1秒
|
// 延迟1秒
|
||||||
ThreadUtil.sleep(1000);
|
ThreadUtil.sleep(1000);
|
||||||
// 订阅主题
|
// 订阅主题
|
||||||
Collection<String> topics = Lists.newArrayList("zeshi");
|
Collection<String> topics = Lists.newArrayList(TIPSY);
|
||||||
kafkaConsumer.subscribe(topics);
|
kafkaConsumer.subscribe(topics);
|
||||||
while (true) {
|
while (true) {
|
||||||
// 轮询消费消息
|
// 轮询消费消息
|
||||||
|
@ -75,7 +78,7 @@ public class KafkaConsumerService implements InitializingBean {
|
||||||
WarnRule warnRule = (WarnRule) map.get("warnRule");
|
WarnRule warnRule = (WarnRule) map.get("warnRule");
|
||||||
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
|
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
|
||||||
VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
|
VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
|
||||||
// eventInterface.handle(jsonObject);
|
eventPublisher.eventPublish(jsonObject);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// 捕获异常
|
// 捕获异常
|
||||||
log.info("这个有问题:{}",e.getMessage());
|
log.info("这个有问题:{}",e.getMessage());
|
||||||
|
|
|
@ -3,9 +3,9 @@ package com.muyu.processing.consumer;
|
||||||
|
|
||||||
import com.muyu.processing.utils.CacheUtil;
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -23,6 +23,15 @@ import javax.annotation.Resource;
|
||||||
@Component
|
@Component
|
||||||
public class OfflineMonitoringConsumer {
|
public class OfflineMonitoringConsumer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 下线监听队列名称
|
||||||
|
*/
|
||||||
|
private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING";
|
||||||
|
/**
|
||||||
|
* 下线监听交换机名称
|
||||||
|
*/
|
||||||
|
private static final String OFFLINE_EXCHANGE = "OFFLINE_EXCHANGE";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private CacheUtil cacheUtil;
|
private CacheUtil cacheUtil;
|
||||||
|
|
||||||
|
@ -30,22 +39,14 @@ public class OfflineMonitoringConsumer {
|
||||||
* 接收消息
|
* 接收消息
|
||||||
* @param vin 车辆vin
|
* @param vin 车辆vin
|
||||||
*/
|
*/
|
||||||
@RabbitListener(queuesToDeclare = @Queue("offline_monitoring"))
|
@RabbitListener(bindings = @QueueBinding(
|
||||||
public void receive(String vin, Message message, Channel channel){
|
value = @Queue(value = OFFLINE_MONITORING, declare = "true"),
|
||||||
try {
|
exchange = @Exchange(value = OFFLINE_EXCHANGE, type = "fanout")))
|
||||||
|
public void online(String vin){
|
||||||
log.info("清除缓存中的数据,车辆vin: {}", vin);
|
log.info("清除缓存中的数据,车辆vin: {}", vin);
|
||||||
// 清除缓存
|
// 清除缓存
|
||||||
cacheUtil.remove(vin);
|
cacheUtil.remove(vin);
|
||||||
log.info("vin码为: {}, 的本地缓存清除成功",vin);
|
log.info("vin码为: {}de本地缓存清除成功",vin);
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
||||||
} catch (Exception e) {
|
|
||||||
try {
|
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
log.info("清除本地缓存异常为: {}",e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,10 +7,10 @@ import com.muyu.domain.WarnStrategy;
|
||||||
import com.muyu.domain.resp.VehicleManageResp;
|
import com.muyu.domain.resp.VehicleManageResp;
|
||||||
import com.muyu.enterprise.cache.*;
|
import com.muyu.enterprise.cache.*;
|
||||||
import com.muyu.processing.utils.CacheUtil;
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@ -29,6 +29,15 @@ import java.util.HashMap;
|
||||||
@Component
|
@Component
|
||||||
public class OnLineMonitoringConsumer {
|
public class OnLineMonitoringConsumer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上线监听队列名称
|
||||||
|
*/
|
||||||
|
private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING";
|
||||||
|
/**
|
||||||
|
* 上线监听交换机名称
|
||||||
|
*/
|
||||||
|
private static final String ONLINE_EXCHANGE = "ONLINE_EXCHANGE";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private CacheUtil cacheUtil;
|
private CacheUtil cacheUtil;
|
||||||
|
|
||||||
|
@ -53,10 +62,10 @@ public class OnLineMonitoringConsumer {
|
||||||
/**
|
/**
|
||||||
* 上线监听车辆网关中车辆上线时
|
* 上线监听车辆网关中车辆上线时
|
||||||
*/
|
*/
|
||||||
@RabbitListener(queuesToDeclare = @Queue("long_time_no_see"))
|
@RabbitListener(bindings = @QueueBinding(
|
||||||
public void receive(String vin, Message message, Channel channel){
|
value = @Queue(value = ON_LINE_MONITORING, declare = "true"),
|
||||||
|
exchange = @Exchange(value = ONLINE_EXCHANGE, type = "fanout")))
|
||||||
try {
|
public void online(String vin){
|
||||||
log.info("添加本地缓存,车辆vin: {}", vin);
|
log.info("添加本地缓存,车辆vin: {}", vin);
|
||||||
// 获取redis中的数据
|
// 获取redis中的数据
|
||||||
Fence fence = fenceCahceService.get(vin);
|
Fence fence = fenceCahceService.get(vin);
|
||||||
|
@ -76,17 +85,7 @@ public class OnLineMonitoringConsumer {
|
||||||
// 添加到本地缓存中
|
// 添加到本地缓存中
|
||||||
cacheUtil.put(vin,map);
|
cacheUtil.put(vin,map);
|
||||||
log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map);
|
log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map);
|
||||||
// 手动确认消息发送成功
|
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
|
||||||
} catch (Exception e) {
|
|
||||||
try {
|
|
||||||
// 手动拒绝消息
|
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
throw new RuntimeException(ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package com.muyu.processing.controller;
|
package com.muyu.processing.controller;
|
||||||
|
|
||||||
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONObject;
|
||||||
import com.muyu.common.core.utils.uuid.UUID;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
@ -25,7 +24,17 @@ import javax.annotation.Resource;
|
||||||
@RequestMapping("/kafka")
|
@RequestMapping("/kafka")
|
||||||
public class TestKafka {
|
public class TestKafka {
|
||||||
|
|
||||||
|
private static final String TIPSY = "tipsy";
|
||||||
|
private static final String VIN = "1123wsdfr54323wsd";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上线监听队列名称
|
||||||
|
*/
|
||||||
|
private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING";
|
||||||
|
/**
|
||||||
|
* 下线监听队列名称
|
||||||
|
*/
|
||||||
|
private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING";
|
||||||
@Resource
|
@Resource
|
||||||
private KafkaProducer<String, String> kafkaProducer;
|
private KafkaProducer<String, String> kafkaProducer;
|
||||||
|
|
||||||
|
@ -34,43 +43,35 @@ public class TestKafka {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送Kafka消息
|
* 发送Kafka消息
|
||||||
* @return String
|
|
||||||
*/
|
*/
|
||||||
@GetMapping("/send")
|
@GetMapping("/send")
|
||||||
public String sendMsg(){
|
public String sendMsg(){
|
||||||
JSONObject entries = new JSONObject();
|
JSONObject entries = new JSONObject();
|
||||||
entries.set("vin","1123wsdfr54323wsd");
|
entries.set("vin",VIN);
|
||||||
entries.set("name","宝马");
|
entries.set("name","宝马");
|
||||||
String entriesString = entries.toString();
|
String entriesString = entries.toString();
|
||||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", entriesString);
|
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TIPSY, entriesString);
|
||||||
kafkaProducer.send(producerRecord);
|
kafkaProducer.send(producerRecord);
|
||||||
return "OK";
|
return "OK";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送MQ消息
|
* 上线监听测试
|
||||||
* @return String
|
|
||||||
*/
|
*/
|
||||||
@GetMapping("/sendMQ")
|
@GetMapping("/sendMQ")
|
||||||
public String sendMQ(){
|
public String sendMQ(){
|
||||||
rabbitTemplate.convertAndSend("long_time_no_see","1123wsdfr54323wsd",message -> {
|
// 发送消息
|
||||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
rabbitTemplate.convertAndSend(ON_LINE_MONITORING,VIN);
|
||||||
return message;
|
|
||||||
});
|
|
||||||
return "OK";
|
return "OK";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送MQ队列消息
|
* 下线监听测试
|
||||||
* @return String
|
|
||||||
*/
|
*/
|
||||||
@GetMapping("/sendDui")
|
@GetMapping("/sendDui")
|
||||||
public String sedDui() {
|
public String sedDui() {
|
||||||
rabbitTemplate.convertAndSend("offline_monitoring","1123wsdfr54323wsd",message -> {
|
// 发送消息
|
||||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
rabbitTemplate.convertAndSend(OFFLINE_MONITORING,VIN);
|
||||||
return message;
|
|
||||||
});
|
|
||||||
// rabbitTemplate.convertAndSend("myExchange","Im.fine","");
|
|
||||||
return "OK";
|
return "OK";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package com.muyu.processing.listener;
|
package com.muyu.processing.listener;
|
||||||
|
|
||||||
import cn.hutool.json.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.muyu.processing.basic.EventCustom;
|
import com.muyu.processing.basic.EventCustom;
|
||||||
import com.muyu.processing.basic.EventListener;
|
import com.muyu.processing.basic.EventListener;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue