fix(): 事件基础
parent
87d104cf9b
commit
9499d9738e
|
@ -35,6 +35,7 @@
|
|||
<artifactId>cloud-common-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -2,7 +2,7 @@ package com.muyu.rabbitmq.consumer;
|
|||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.common.redis.service.RedisService;
|
||||
import com.muyu.rabbitmq.util.CacheUtil;
|
||||
//import com.muyu.rabbitmq.util.CacheUtil;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -26,8 +26,8 @@ public class RabbitMQConsumerUtil {
|
|||
|
||||
private final RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
private CacheUtil cacheUtil;
|
||||
// @Autowired
|
||||
// private CacheUtil cacheUtil;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -54,7 +54,7 @@ public class RabbitMQConsumerUtil {
|
|||
* -----------------------------------以下为异步业务操作----------------------------
|
||||
*/
|
||||
String carList = (String) redisService.redisTemplate.opsForValue().get("carList");
|
||||
cacheUtil.put("carList",carList);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -47,107 +47,107 @@ public class RabbitMQProducerUtil {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Work queue 工作模型
|
||||
*
|
||||
* @param obj 传递的消息 (如果是对象需要序列化)
|
||||
* @return 结果集
|
||||
* 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点
|
||||
*/
|
||||
public Result<?> workSendMessage(String queueName, Object obj, String msg) {
|
||||
|
||||
log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(queueName, obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
|
||||
|
||||
return Result.success("消息发送成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish/Subscribe 发布订阅者模型
|
||||
* 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
|
||||
*
|
||||
* @param exchange 交换机名称
|
||||
* @param obj 发送的消息Object
|
||||
* @param msg 响应的内容
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
|
||||
|
||||
log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: exchange 交换机的名称
|
||||
// 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
|
||||
// 第三个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
|
||||
return Result.success("消息发送成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* Routing路由模型
|
||||
* 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式
|
||||
*
|
||||
* @param exchange 交换机名称
|
||||
* @param rule 绑定规则 一个字符串即可
|
||||
* @param obj 发送的消息Object
|
||||
* @param msg 响应的内容
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> routingSendMessage(String exchange, String rule, Object obj, String msg) {
|
||||
|
||||
log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
|
||||
return Result.success("消息发送成功");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Topic主题模型模型
|
||||
* 使用的是 topic 类型的交换机
|
||||
*
|
||||
* @param exchange 交换机名称
|
||||
* @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# )
|
||||
* @param obj 发送的消息Object
|
||||
* @param msg 响应的内容
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> topicSendMessage(String exchange, String rule, Object obj) {
|
||||
|
||||
log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
|
||||
return Result.success(obj,"消息发送成功");
|
||||
}
|
||||
// /**
|
||||
// * Work queue 工作模型
|
||||
// *
|
||||
// * @param obj 传递的消息 (如果是对象需要序列化)
|
||||
// * @return 结果集
|
||||
// * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点
|
||||
// */
|
||||
// public Result<?> workSendMessage(String queueName, Object obj, String msg) {
|
||||
//
|
||||
// log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
|
||||
// // 发送简单模型消息
|
||||
// // 第一个参数: 绑定规则 相当于 队列名称
|
||||
// // 第二个参数:消息内容
|
||||
// rabbitTemplate.convertAndSend(queueName, obj, message -> {
|
||||
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
// return message;
|
||||
// } );
|
||||
//
|
||||
// log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
|
||||
//
|
||||
// return Result.success("消息发送成功");
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * Publish/Subscribe 发布订阅者模型
|
||||
// * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
|
||||
// *
|
||||
// * @param exchange 交换机名称
|
||||
// * @param obj 发送的消息Object
|
||||
// * @param msg 响应的内容
|
||||
// * @return 结果集
|
||||
// */
|
||||
// public Result<?> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
|
||||
//
|
||||
// log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
|
||||
// // 发送简单模型消息
|
||||
// // 第一个参数: exchange 交换机的名称
|
||||
// // 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
|
||||
// // 第三个参数:消息内容
|
||||
// rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
|
||||
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
// return message;
|
||||
// } );
|
||||
//
|
||||
// log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
//
|
||||
// return Result.success("消息发送成功");
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * Routing路由模型
|
||||
// * 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式
|
||||
// *
|
||||
// * @param exchange 交换机名称
|
||||
// * @param rule 绑定规则 一个字符串即可
|
||||
// * @param obj 发送的消息Object
|
||||
// * @param msg 响应的内容
|
||||
// * @return 结果集
|
||||
// */
|
||||
// public Result<?> routingSendMessage(String exchange, String rule, Object obj, String msg) {
|
||||
//
|
||||
// log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
|
||||
// // 发送简单模型消息
|
||||
// // 第一个参数: 绑定规则 相当于 队列名称
|
||||
// // 第二个参数:消息内容
|
||||
// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
|
||||
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
// return message;
|
||||
// } );
|
||||
//
|
||||
// log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
//
|
||||
// return Result.success("消息发送成功");
|
||||
// }
|
||||
//
|
||||
//
|
||||
// /**
|
||||
// * Topic主题模型模型
|
||||
// * 使用的是 topic 类型的交换机
|
||||
// *
|
||||
// * @param exchange 交换机名称
|
||||
// * @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# )
|
||||
// * @param obj 发送的消息Object
|
||||
// * @param msg 响应的内容
|
||||
// * @return 结果集
|
||||
// */
|
||||
// public Result<?> topicSendMessage(String exchange, String rule, Object obj) {
|
||||
//
|
||||
// log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
|
||||
// // 发送简单模型消息
|
||||
// // 第一个参数: 绑定规则 相当于 队列名称
|
||||
// // 第二个参数:消息内容
|
||||
// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
|
||||
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
// return message;
|
||||
// } );
|
||||
//
|
||||
// log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
//
|
||||
// return Result.success(obj,"消息发送成功");
|
||||
// }
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
package com.muyu.rabbitmq.util;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 缓存工具类
|
||||
*
|
||||
* @program: cloud-server
|
||||
* @author: 刘武
|
||||
* @create: 2024-09-30 10:08
|
||||
**/
|
||||
@Component
|
||||
public class CacheUtil<T> {
|
||||
|
||||
private final Cache<String, T> cache;
|
||||
|
||||
public CacheUtil() {
|
||||
this.cache = Caffeine.newBuilder()
|
||||
.maximumSize(500L)
|
||||
.build();
|
||||
}
|
||||
|
||||
public T get(String key) {
|
||||
return cache.getIfPresent(key);
|
||||
}
|
||||
|
||||
public void put(String key, T value) {
|
||||
cache.put(key, value);
|
||||
}
|
||||
|
||||
public void remove(String key) {
|
||||
cache.invalidate(key);
|
||||
}
|
||||
|
||||
}
|
|
@ -115,6 +115,12 @@
|
|||
<artifactId>cloud-common-rabbit</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>saas-cache</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -28,6 +28,4 @@ public class EventPublisher implements ApplicationEventPublisherAware {
|
|||
publisher.publishEvent(event);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,9 +1,24 @@
|
|||
package com.muyu.event.consumer;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.cache.ElectronicFenceGroupCacheService;
|
||||
import com.muyu.cache.SysCarCacheService;
|
||||
import com.muyu.common.domain.database.ElectronicFenceGroup;
|
||||
import com.muyu.common.domain.resp.SysCarVo;
|
||||
import com.muyu.common.redis.service.RedisService;
|
||||
import com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* rabbitmq 监听器
|
||||
* @author 刘武
|
||||
|
@ -13,9 +28,65 @@ import org.springframework.stereotype.Component;
|
|||
*/
|
||||
|
||||
@Component
|
||||
@Log4j2
|
||||
public class MqConsumer {
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
private SysCarCacheService sysCarCacheService;
|
||||
@Autowired
|
||||
private ElectronicFenceGroupCacheService electronicFenceGroupCacheService;
|
||||
|
||||
@RabbitListener(queuesToDeclare = @Queue(name = "basic"))
|
||||
public void rabbitMQBasicConsumer(String data , Message message , Channel channel) {
|
||||
log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message);
|
||||
try {
|
||||
// 获取到消息 开始消费
|
||||
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
|
||||
|
||||
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
|
||||
|
||||
if (add != 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* -----------------------------------以下为异步业务操作----------------------------
|
||||
*/
|
||||
List<SysCarVo> carList = sysCarCacheService.get("carList");
|
||||
ElectronicFenceGroup fenceGroupList = electronicFenceGroupCacheService.get("electronicFenceGroupList");
|
||||
|
||||
|
||||
/**
|
||||
* ------------------------------------------------------------------------------
|
||||
*/
|
||||
// 消费消息成功之后需要确认
|
||||
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
|
||||
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
log.info("xxx消费者接收到消息,消息内容:{},消费成功...", message);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("xxx消费者接收到消息,消息内容:{},消费消息异常,异常信息:{}", message, e);
|
||||
// 消息回退 拒绝消费消息
|
||||
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
|
||||
// boolean requeue 是否回到原来的队列
|
||||
try {
|
||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
|
||||
} catch (IOException ex) {
|
||||
log.error("xxx消费者接收到消息,消息内容:{},回退消息异常,异常信息:{}", message, ex);
|
||||
}
|
||||
}finally {
|
||||
try {
|
||||
channel.close();
|
||||
} catch (Exception e) {
|
||||
log.error("xxx消费者关闭Channel异常,消息内容:{},异常信息:{}", message, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.springframework.web.bind.annotation.RestController;
|
|||
* @name:DataController
|
||||
* @date:2024/9/29 20:16
|
||||
*/
|
||||
|
||||
@RestController
|
||||
@RequestMapping("data")
|
||||
public class DataController {
|
||||
|
|
|
@ -57,18 +57,4 @@ public class IoTDBController {
|
|||
return Result.success("添加成功");
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ public class AddDatabaseListener implements EventListener {
|
|||
keys.add(key);
|
||||
values.add((String) value);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -33,7 +33,6 @@ public class TemplateController {
|
|||
@Autowired
|
||||
private TemplateService templateService;
|
||||
|
||||
|
||||
/**
|
||||
* 报文模版列表
|
||||
* @return
|
||||
|
@ -58,7 +57,7 @@ public class TemplateController {
|
|||
}
|
||||
|
||||
/**
|
||||
* 报文模版添加
|
||||
* 报文模版添加0002222220
|
||||
* @param template
|
||||
* @return
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue