From 58d49f9d8b6878b781f83f2e78d0b258cee359bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=91=E5=B9=B4=E6=A2=A6=E4=B8=8E=E7=A0=96?= <2847127106@qq.com> Date: Mon, 30 Sep 2024 16:52:12 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E6=88=90Rabbit+Redis=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-common/cloud-common-caffeine/pom.xml | 5 -- .../{ => utils}/CaffeineCacheUtils.java | 20 ++++---- ...ot.autoconfigure.AutoConfiguration.imports | 2 +- ...ot.autoconfigure.AutoConfiguration.imports | 4 +- .../data/processing/MyDataApplication.java | 1 + .../processing/controller/TestController.java | 7 +-- .../rebbit/DownlineRabbitConsumer.java | 34 +++++++++++--- .../rebbit/GoOnlineRabbitConsumer.java | 46 ++++++++++++++++--- 8 files changed, 86 insertions(+), 33 deletions(-) rename cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/{ => utils}/CaffeineCacheUtils.java (85%) diff --git a/cloud-common/cloud-common-caffeine/pom.xml b/cloud-common/cloud-common-caffeine/pom.xml index 0201487..d3e34be 100644 --- a/cloud-common/cloud-common-caffeine/pom.xml +++ b/cloud-common/cloud-common-caffeine/pom.xml @@ -25,11 +25,6 @@ cloud-common-redis - - com.github.ben-manes.caffeine - caffeine - 2.9.3 - com.github.ben-manes.caffeine caffeine diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/CaffeineCacheUtils.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/utils/CaffeineCacheUtils.java similarity index 85% rename from cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/CaffeineCacheUtils.java rename to cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/utils/CaffeineCacheUtils.java index 709cf0b..c6fefc4 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/CaffeineCacheUtils.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/utils/CaffeineCacheUtils.java @@ -1,21 +1,23 @@ -package com.muyu.common.caffeine; +package com.muyu.common.caffeine.utils; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.muyu.common.caffeine.constents.CaffeineContent; import com.muyu.common.redis.service.RedisService; +import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.springframework.cache.caffeine.CaffeineCache; import org.springframework.cache.support.SimpleCacheManager; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import javax.annotation.Resource; import java.util.ArrayList; -import java.util.Map; +import java.util.Collection; /** + * Caffeine缓存工具 * @Author: 胡杨 * @Name: CaffeineUtils * @Description: 缓存工具类 @@ -29,7 +31,8 @@ public class CaffeineCacheUtils { private RedisService redisService; @Resource private SimpleCacheManager simpleCacheManager; - + @Resource + private RedisTemplate redisTemplate; /** * 车辆上线 - 新增缓存 @@ -37,13 +40,14 @@ public class CaffeineCacheUtils { public void addCarCache(String vin) { ArrayList caches = new ArrayList<>(); // 从Redis中获取缓存信息 - Map cacheMap = redisService.getCacheMap(CaffeineContent.CAR_VIN_KEY +vin); - cacheMap.forEach((key, value) -> { + Collection keys = redisTemplate.keys(CaffeineContent.CAR_VIN_KEY + vin); + keys.forEach(key -> { + Object string = redisTemplate.opsForValue().get(key); Cache cache = Caffeine.newBuilder().build(); - cache.put(key, value); + cache.put(key, string); // 全部存储到 CaffeineCache集合 caches.add(new CaffeineCache(vin, cache)); - log.info("存储缓存,vin:{}, key:{}, value:{}", vin, key, value); + log.info("存储缓存,vin:{}, key:{}, value:{}", vin, key, string); }); simpleCacheManager.setCaches(caches); log.info("车辆编码:{},本地缓存完成...",vin); diff --git a/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 2452d1c..73851ce 100644 --- a/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,2 @@ -com.muyu.common.caffeine.CaffeineCacheUtils +com.muyu.common.caffeine.utils.CaffeineCacheUtils com.muyu.common.caffeine.bean.CaffeineManagerBean diff --git a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 3c60088..f276344 100644 --- a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,3 @@ -#com.muyu.common.rabbit.config.RabbitListenerConfigurer +com.muyu.common.rabbit.config.RabbitListenerConfigurer +com.muyu.common.rabbit.config.ConfirmCallbackConfig +com.muyu.common.rabbit.config.ReturnCallbackConfig diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java index bb658b0..64d8b2e 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java @@ -19,6 +19,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; * @FilePath: com.muyu.data.processing */ @EnableRabbit +@EnableCustomConfig @EnableMyFeignClients @SpringBootApplication public class MyDataApplication { diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index b8edca8..bdec545 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -1,7 +1,7 @@ package com.muyu.data.processing.controller; -import com.muyu.common.caffeine.CaffeineCacheUtils; +import com.muyu.common.caffeine.utils.CaffeineCacheUtils; import com.muyu.common.core.utils.uuid.UUID; import com.muyu.common.iotdb.config.IotDBConfig; import com.muyu.common.kafka.constants.KafkaConstants; @@ -74,10 +74,7 @@ public class TestController { @GetMapping("/testRabbit/GoOnline") public void testRabbitGoOnline(@RequestParam("msg") String msg) { - rabbitTemplate.convertAndSend(RabbitConstants.GO_ONLINE_QUEUE, msg, message -> { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-","")); - return message; - }); + rabbitTemplate.convertAndSend(RabbitConstants.GO_ONLINE_QUEUE, msg); } @GetMapping("/testRabbit/Downline") diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java index 07397ad..05a8f48 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java @@ -1,13 +1,19 @@ package com.muyu.data.processing.rebbit; -import com.muyu.common.caffeine.CaffeineCacheUtils; +import com.muyu.common.caffeine.utils.CaffeineCacheUtils; import com.muyu.common.rabbit.constants.RabbitConstants; import com.rabbitmq.client.Channel; +import jakarta.annotation.Resource; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.cache.Cache; +import org.springframework.cache.support.SimpleCacheManager; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; @@ -23,18 +29,22 @@ import java.util.HashSet; */ @Slf4j @Component +@Setter public class DownlineRabbitConsumer { - private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils(); - - private static final HashSet DOWNLINE_SET = new HashSet<>(); + private CaffeineCacheUtils caffeineCacheUtils; + @Resource + private RedisTemplate redisTemplate; + @Resource + private SimpleCacheManager simpleCacheManager; @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.DOWNLINE_QUEUE)}) public void downline(String vin, Message message, Channel channel) { log.info("车辆 {} 下线, 配置信息准备中。。。",vin); try { // 重复性校验 - if (DOWNLINE_SET.add(message.getMessageProperties().getMessageId())) { -// caffeineCacheUtils.deleteCarCache(vin); + Long add = redisTemplate.opsForSet().add(RabbitConstants.DOWNLINE_QUEUE, message.getMessageProperties().getMessageId()); + if (add>0) { + deleteCarCache(vin); log.info("车辆 {} 下线, 消息已确认。。。",vin); } else { log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin); @@ -50,4 +60,16 @@ public class DownlineRabbitConsumer { } } } + + + /** + * 车辆下线 - 删除缓存 + */ + public void deleteCarCache(String vin) { + Cache cache = simpleCacheManager.getCache(vin); + if (ObjectUtils.isNotEmpty(cache)){ + cache.invalidate(); + } + log.info("车辆编码:{},本地缓存删除完成...", vin); + } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java index 015c4b3..e3f55b1 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java @@ -1,17 +1,26 @@ package com.muyu.data.processing.rebbit; -import com.muyu.common.caffeine.CaffeineCacheUtils; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.muyu.common.caffeine.constents.CaffeineContent; +import com.muyu.common.caffeine.utils.CaffeineCacheUtils; import com.muyu.common.rabbit.constants.RabbitConstants; import com.rabbitmq.client.Channel; +import jakarta.annotation.Resource; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.cache.caffeine.CaffeineCache; +import org.springframework.cache.support.SimpleCacheManager; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.Collection; /** * 上线事件监听 @@ -23,19 +32,23 @@ import java.util.HashSet; */ @Slf4j @Component +@Setter public class GoOnlineRabbitConsumer { + private CaffeineCacheUtils caffeineCacheUtils; + @Resource + private RedisTemplate redisTemplate; + @Resource + private SimpleCacheManager simpleCacheManager; - private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils(); - - private static final HashSet DATA_SET = new HashSet<>(); @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)}) public void goOnline(String vin, Message message, Channel channel){ log.info("车辆 {} 上线, 配置信息准备中。。。",vin); try { // 重复性校验 - if (DATA_SET.add(message.getMessageProperties().getMessageId())) { - caffeineCacheUtils.addCarCache(vin); + Long add = redisTemplate.opsForSet().add(RabbitConstants.DOWNLINE_QUEUE, message.getMessageProperties().getMessageId()); + if (add>0) { + addCarCache(vin); log.info("车辆 {} 上线, 消息已确认。。。",vin); } else { log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin); @@ -51,4 +64,23 @@ public class GoOnlineRabbitConsumer { } } } + + /** + * 车辆上线 - 新增缓存 + */ + public void addCarCache(String vin) { + ArrayList caches = new ArrayList<>(); + // 从Redis中获取缓存信息 + Collection keys = redisTemplate.keys(CaffeineContent.CAR_VIN_KEY + vin); + keys.forEach(key -> { + Object string = redisTemplate.opsForValue().get(key); + Cache cache = Caffeine.newBuilder().build(); + cache.put(key, string); + // 全部存储到 CaffeineCache集合 + caches.add(new CaffeineCache(vin, cache)); + log.info("存储缓存,vin:{}, key:{}, value:{}", vin, key, string); + }); + simpleCacheManager.setCaches(caches); + log.info("车辆编码:{},本地缓存完成...",vin); + } }