feat: 完成Rabbit+Redis测试

dev.data.processing.dataTreating
面包骑士 2024-09-30 16:52:12 +08:00
parent 479b55ec18
commit 58d49f9d8b
8 changed files with 86 additions and 33 deletions

View File

@ -25,11 +25,6 @@
<artifactId>cloud-common-redis</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>

View File

@ -1,21 +1,23 @@
package com.muyu.common.caffeine;
package com.muyu.common.caffeine.utils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.common.caffeine.constents.CaffeineContent;
import com.muyu.common.redis.service.RedisService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.cache.caffeine.CaffeineCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Map;
import java.util.Collection;
/**
* Caffeine
* @Author:
* @Name: CaffeineUtils
* @Description:
@ -29,7 +31,8 @@ public class CaffeineCacheUtils {
private RedisService redisService;
@Resource
private SimpleCacheManager simpleCacheManager;
@Resource
private RedisTemplate<String,Object> redisTemplate;
/**
* 线 -
@ -37,13 +40,14 @@ public class CaffeineCacheUtils {
public void addCarCache(String vin) {
ArrayList<CaffeineCache> caches = new ArrayList<>();
// 从Redis中获取缓存信息
Map<String,Object> cacheMap = redisService.getCacheMap(CaffeineContent.CAR_VIN_KEY +vin);
cacheMap.forEach((key, value) -> {
Collection<String> keys = redisTemplate.keys(CaffeineContent.CAR_VIN_KEY + vin);
keys.forEach(key -> {
Object string = redisTemplate.opsForValue().get(key);
Cache<Object , Object> cache = Caffeine.newBuilder().build();
cache.put(key, value);
cache.put(key, string);
// 全部存储到 CaffeineCache集合
caches.add(new CaffeineCache(vin, cache));
log.info("存储缓存,vin:{}, key:{}, value:{}", vin, key, value);
log.info("存储缓存,vin:{}, key:{}, value:{}", vin, key, string);
});
simpleCacheManager.setCaches(caches);
log.info("车辆编码:{},本地缓存完成...",vin);

View File

@ -1,2 +1,2 @@
com.muyu.common.caffeine.CaffeineCacheUtils
com.muyu.common.caffeine.utils.CaffeineCacheUtils
com.muyu.common.caffeine.bean.CaffeineManagerBean

View File

@ -1 +1,3 @@
#com.muyu.common.rabbit.config.RabbitListenerConfigurer
com.muyu.common.rabbit.config.RabbitListenerConfigurer
com.muyu.common.rabbit.config.ConfirmCallbackConfig
com.muyu.common.rabbit.config.ReturnCallbackConfig

View File

@ -19,6 +19,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
* @FilePath: com.muyu.data.processing
*/
@EnableRabbit
@EnableCustomConfig
@EnableMyFeignClients
@SpringBootApplication
public class MyDataApplication {

View File

@ -1,7 +1,7 @@
package com.muyu.data.processing.controller;
import com.muyu.common.caffeine.CaffeineCacheUtils;
import com.muyu.common.caffeine.utils.CaffeineCacheUtils;
import com.muyu.common.core.utils.uuid.UUID;
import com.muyu.common.iotdb.config.IotDBConfig;
import com.muyu.common.kafka.constants.KafkaConstants;
@ -74,10 +74,7 @@ public class TestController {
@GetMapping("/testRabbit/GoOnline")
public void testRabbitGoOnline(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend(RabbitConstants.GO_ONLINE_QUEUE, msg, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-",""));
return message;
});
rabbitTemplate.convertAndSend(RabbitConstants.GO_ONLINE_QUEUE, msg);
}
@GetMapping("/testRabbit/Downline")

View File

@ -1,13 +1,19 @@
package com.muyu.data.processing.rebbit;
import com.muyu.common.caffeine.CaffeineCacheUtils;
import com.muyu.common.caffeine.utils.CaffeineCacheUtils;
import com.muyu.common.rabbit.constants.RabbitConstants;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.cache.Cache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
@ -23,18 +29,22 @@ import java.util.HashSet;
*/
@Slf4j
@Component
@Setter
public class DownlineRabbitConsumer {
private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();
private static final HashSet<String> DOWNLINE_SET = new HashSet<>();
private CaffeineCacheUtils caffeineCacheUtils;
@Resource
private RedisTemplate<String,String> redisTemplate;
@Resource
private SimpleCacheManager simpleCacheManager;
@RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.DOWNLINE_QUEUE)})
public void downline(String vin, Message message, Channel channel) {
log.info("车辆 {} 下线, 配置信息准备中。。。",vin);
try {
// 重复性校验
if (DOWNLINE_SET.add(message.getMessageProperties().getMessageId())) {
// caffeineCacheUtils.deleteCarCache(vin);
Long add = redisTemplate.opsForSet().add(RabbitConstants.DOWNLINE_QUEUE, message.getMessageProperties().getMessageId());
if (add>0) {
deleteCarCache(vin);
log.info("车辆 {} 下线, 消息已确认。。。",vin);
} else {
log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin);
@ -50,4 +60,16 @@ public class DownlineRabbitConsumer {
}
}
}
/**
* 线 -
*/
public void deleteCarCache(String vin) {
Cache cache = simpleCacheManager.getCache(vin);
if (ObjectUtils.isNotEmpty(cache)){
cache.invalidate();
}
log.info("车辆编码:{},本地缓存删除完成...", vin);
}
}

View File

@ -1,17 +1,26 @@
package com.muyu.data.processing.rebbit;
import com.muyu.common.caffeine.CaffeineCacheUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.common.caffeine.constents.CaffeineContent;
import com.muyu.common.caffeine.utils.CaffeineCacheUtils;
import com.muyu.common.rabbit.constants.RabbitConstants;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.cache.caffeine.CaffeineCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collection;
/**
* 线
@ -23,19 +32,23 @@ import java.util.HashSet;
*/
@Slf4j
@Component
@Setter
public class GoOnlineRabbitConsumer {
private CaffeineCacheUtils caffeineCacheUtils;
@Resource
private RedisTemplate<String,String> redisTemplate;
@Resource
private SimpleCacheManager simpleCacheManager;
private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();
private static final HashSet<String> DATA_SET = new HashSet<>();
@RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)})
public void goOnline(String vin, Message message, Channel channel){
log.info("车辆 {} 上线, 配置信息准备中。。。",vin);
try {
// 重复性校验
if (DATA_SET.add(message.getMessageProperties().getMessageId())) {
caffeineCacheUtils.addCarCache(vin);
Long add = redisTemplate.opsForSet().add(RabbitConstants.DOWNLINE_QUEUE, message.getMessageProperties().getMessageId());
if (add>0) {
addCarCache(vin);
log.info("车辆 {} 上线, 消息已确认。。。",vin);
} else {
log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin);
@ -51,4 +64,23 @@ public class GoOnlineRabbitConsumer {
}
}
}
/**
* 线 -
*/
public void addCarCache(String vin) {
ArrayList<CaffeineCache> caches = new ArrayList<>();
// 从Redis中获取缓存信息
Collection<String> keys = redisTemplate.keys(CaffeineContent.CAR_VIN_KEY + vin);
keys.forEach(key -> {
Object string = redisTemplate.opsForValue().get(key);
Cache<Object , Object> cache = Caffeine.newBuilder().build();
cache.put(key, string);
// 全部存储到 CaffeineCache集合
caches.add(new CaffeineCache(vin, cache));
log.info("存储缓存,vin:{}, key:{}, value:{}", vin, key, string);
});
simpleCacheManager.setCaches(caches);
log.info("车辆编码:{},本地缓存完成...",vin);
}
}