diff --git a/cloud-common/cloud-common-rabbit/pom.xml b/cloud-common/cloud-common-rabbit/pom.xml
index 5d535ad..f8daff2 100644
--- a/cloud-common/cloud-common-rabbit/pom.xml
+++ b/cloud-common/cloud-common-rabbit/pom.xml
@@ -35,6 +35,7 @@
cloud-common-redis
+
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java
index aa41b60..f823509 100644
--- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java
@@ -2,7 +2,7 @@ package com.muyu.rabbitmq.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.redis.service.RedisService;
-import com.muyu.rabbitmq.util.CacheUtil;
+//import com.muyu.rabbitmq.util.CacheUtil;
import com.rabbitmq.client.Channel;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
@@ -26,8 +26,8 @@ public class RabbitMQConsumerUtil {
private final RedisService redisService;
- @Autowired
- private CacheUtil cacheUtil;
+// @Autowired
+// private CacheUtil cacheUtil;
/**
@@ -54,7 +54,7 @@ public class RabbitMQConsumerUtil {
* -----------------------------------以下为异步业务操作----------------------------
*/
String carList = (String) redisService.redisTemplate.opsForValue().get("carList");
- cacheUtil.put("carList",carList);
+
/**
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java
index 9050496..3d9a148 100644
--- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java
+++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/producer/RabbitMQProducerUtil.java
@@ -47,107 +47,107 @@ public class RabbitMQProducerUtil {
}
- /**
- * Work queue 工作模型
- *
- * @param obj 传递的消息 (如果是对象需要序列化)
- * @return 结果集
- * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点
- */
- public Result> workSendMessage(String queueName, Object obj, String msg) {
-
- log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
- // 发送简单模型消息
- // 第一个参数: 绑定规则 相当于 队列名称
- // 第二个参数:消息内容
- rabbitTemplate.convertAndSend(queueName, obj, message -> {
- message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
- return message;
- } );
-
- log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
-
- return Result.success("消息发送成功");
- }
-
- /**
- * Publish/Subscribe 发布订阅者模型
- * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
- *
- * @param exchange 交换机名称
- * @param obj 发送的消息Object
- * @param msg 响应的内容
- * @return 结果集
- */
- public Result> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
-
- log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
- // 发送简单模型消息
- // 第一个参数: exchange 交换机的名称
- // 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
- // 第三个参数:消息内容
- rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
- message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
- return message;
- } );
-
- log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
-
- return Result.success("消息发送成功");
- }
-
- /**
- * Routing路由模型
- * 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式
- *
- * @param exchange 交换机名称
- * @param rule 绑定规则 一个字符串即可
- * @param obj 发送的消息Object
- * @param msg 响应的内容
- * @return 结果集
- */
- public Result> routingSendMessage(String exchange, String rule, Object obj, String msg) {
-
- log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
- // 发送简单模型消息
- // 第一个参数: 绑定规则 相当于 队列名称
- // 第二个参数:消息内容
- rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
- message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
- return message;
- } );
-
- log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
-
- return Result.success("消息发送成功");
- }
-
-
- /**
- * Topic主题模型模型
- * 使用的是 topic 类型的交换机
- *
- * @param exchange 交换机名称
- * @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# )
- * @param obj 发送的消息Object
- * @param msg 响应的内容
- * @return 结果集
- */
- public Result> topicSendMessage(String exchange, String rule, Object obj) {
-
- log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
- // 发送简单模型消息
- // 第一个参数: 绑定规则 相当于 队列名称
- // 第二个参数:消息内容
- rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
- message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
- return message;
- } );
-
- log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
-
- return Result.success(obj,"消息发送成功");
- }
+// /**
+// * Work queue 工作模型
+// *
+// * @param obj 传递的消息 (如果是对象需要序列化)
+// * @return 结果集
+// * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点
+// */
+// public Result> workSendMessage(String queueName, Object obj, String msg) {
+//
+// log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
+// // 发送简单模型消息
+// // 第一个参数: 绑定规则 相当于 队列名称
+// // 第二个参数:消息内容
+// rabbitTemplate.convertAndSend(queueName, obj, message -> {
+// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+// return message;
+// } );
+//
+// log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
+//
+// return Result.success("消息发送成功");
+// }
+//
+// /**
+// * Publish/Subscribe 发布订阅者模型
+// * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
+// *
+// * @param exchange 交换机名称
+// * @param obj 发送的消息Object
+// * @param msg 响应的内容
+// * @return 结果集
+// */
+// public Result> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
+//
+// log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
+// // 发送简单模型消息
+// // 第一个参数: exchange 交换机的名称
+// // 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
+// // 第三个参数:消息内容
+// rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
+// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+// return message;
+// } );
+//
+// log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
+//
+// return Result.success("消息发送成功");
+// }
+//
+// /**
+// * Routing路由模型
+// * 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式
+// *
+// * @param exchange 交换机名称
+// * @param rule 绑定规则 一个字符串即可
+// * @param obj 发送的消息Object
+// * @param msg 响应的内容
+// * @return 结果集
+// */
+// public Result> routingSendMessage(String exchange, String rule, Object obj, String msg) {
+//
+// log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
+// // 发送简单模型消息
+// // 第一个参数: 绑定规则 相当于 队列名称
+// // 第二个参数:消息内容
+// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
+// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+// return message;
+// } );
+//
+// log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
+//
+// return Result.success("消息发送成功");
+// }
+//
+//
+// /**
+// * Topic主题模型模型
+// * 使用的是 topic 类型的交换机
+// *
+// * @param exchange 交换机名称
+// * @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# )
+// * @param obj 发送的消息Object
+// * @param msg 响应的内容
+// * @return 结果集
+// */
+// public Result> topicSendMessage(String exchange, String rule, Object obj) {
+//
+// log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
+// // 发送简单模型消息
+// // 第一个参数: 绑定规则 相当于 队列名称
+// // 第二个参数:消息内容
+// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
+// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
+// return message;
+// } );
+//
+// log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
+//
+// return Result.success(obj,"消息发送成功");
+// }
/**
diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/util/CacheUtil.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/util/CacheUtil.java
deleted file mode 100644
index 16ed054..0000000
--- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/util/CacheUtil.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.muyu.rabbitmq.util;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.springframework.stereotype.Component;
-
-/**
- * 缓存工具类
- *
- * @program: cloud-server
- * @author: 刘武
- * @create: 2024-09-30 10:08
- **/
-@Component
-public class CacheUtil {
-
- private final Cache cache;
-
- public CacheUtil() {
- this.cache = Caffeine.newBuilder()
- .maximumSize(500L)
- .build();
- }
-
- public T get(String key) {
- return cache.getIfPresent(key);
- }
-
- public void put(String key, T value) {
- cache.put(key, value);
- }
-
- public void remove(String key) {
- cache.invalidate(key);
- }
-
-}
diff --git a/cloud-modules/cloud-event/pom.xml b/cloud-modules/cloud-event/pom.xml
index cae475e..d9321b3 100644
--- a/cloud-modules/cloud-event/pom.xml
+++ b/cloud-modules/cloud-event/pom.xml
@@ -115,6 +115,12 @@
cloud-common-rabbit
+
+ com.muyu
+ saas-cache
+ 3.6.3
+
+
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java
index 624f7fa..f724836 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/basic/EventPublisher.java
@@ -28,6 +28,4 @@ public class EventPublisher implements ApplicationEventPublisherAware {
publisher.publishEvent(event);
}
-
-
}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java
index e8aeb7d..4f10b09 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MqConsumer.java
@@ -1,9 +1,24 @@
package com.muyu.event.consumer;
+import com.alibaba.fastjson2.JSONObject;
+import com.muyu.cache.ElectronicFenceGroupCacheService;
+import com.muyu.cache.SysCarCacheService;
+import com.muyu.common.domain.database.ElectronicFenceGroup;
+import com.muyu.common.domain.resp.SysCarVo;
+import com.muyu.common.redis.service.RedisService;
import com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil;
+import com.rabbitmq.client.Channel;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
/**
* rabbitmq 监听器
* @author 刘武
@@ -13,9 +28,65 @@ import org.springframework.stereotype.Component;
*/
@Component
+@Log4j2
public class MqConsumer {
+ @Autowired
+ private RedisService redisService;
+ @Autowired
+ private SysCarCacheService sysCarCacheService;
+ @Autowired
+ private ElectronicFenceGroupCacheService electronicFenceGroupCacheService;
+
+ @RabbitListener(queuesToDeclare = @Queue(name = "basic"))
+ public void rabbitMQBasicConsumer(String data , Message message , Channel channel) {
+ log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message);
+ try {
+ // 获取到消息 开始消费
+ log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
+
+ Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
+
+ if (add != 1) {
+ return;
+ }
+
+ /**
+ * -----------------------------------以下为异步业务操作----------------------------
+ */
+ List carList = sysCarCacheService.get("carList");
+ ElectronicFenceGroup fenceGroupList = electronicFenceGroupCacheService.get("electronicFenceGroupList");
+
+
+ /**
+ * ------------------------------------------------------------------------------
+ */
+ // 消费消息成功之后需要确认
+ // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
+ // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+ log.info("xxx消费者接收到消息,消息内容:{},消费成功...", message);
+
+ } catch (Exception e) {
+ log.error("xxx消费者接收到消息,消息内容:{},消费消息异常,异常信息:{}", message, e);
+ // 消息回退 拒绝消费消息
+ // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
+ // boolean requeue 是否回到原来的队列
+ try {
+ channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
+// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+ } catch (IOException ex) {
+ log.error("xxx消费者接收到消息,消息内容:{},回退消息异常,异常信息:{}", message, ex);
+ }
+ }finally {
+ try {
+ channel.close();
+ } catch (Exception e) {
+ log.error("xxx消费者关闭Channel异常,消息内容:{},异常信息:{}", message, e);
+ }
+ }
+ }
}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java
index 3603367..d02d4b7 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/DataController.java
@@ -10,6 +10,7 @@ import org.springframework.web.bind.annotation.RestController;
* @name:DataController
* @date:2024/9/29 20:16
*/
+
@RestController
@RequestMapping("data")
public class DataController {
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java
index 7bbfed5..d73514c 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/controller/IoTDBController.java
@@ -57,18 +57,4 @@ public class IoTDBController {
return Result.success("添加成功");
};
-
-
-
-
-
-
-
-
-
-
-
-
-
-
}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java
index e3fc4a7..bfbfd04 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java
@@ -28,6 +28,7 @@ public class AddDatabaseListener implements EventListener {
keys.add(key);
values.add((String) value);
});
+
}
@Override
diff --git a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java
index 89dc512..4b4d6c9 100644
--- a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java
+++ b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java
@@ -33,7 +33,6 @@ public class TemplateController {
@Autowired
private TemplateService templateService;
-
/**
* 报文模版列表
* @return
@@ -58,7 +57,7 @@ public class TemplateController {
}
/**
- * 报文模版添加
+ * 报文模版添加0002222220
* @param template
* @return
*/