From 35244222f081e6f4f6ade7212b8fb1a5aa00ee15 Mon Sep 17 00:00:00 2001 From: 20300 <643145201@qq.com> Date: Fri, 21 Jun 2024 11:33:38 +0800 Subject: [PATCH] =?UTF-8?q?feat()=E6=95=85=E9=9A=9C=E4=BA=8B=E4=BB=B6,?= =?UTF-8?q?=E7=9B=91=E5=90=ACredsi=E8=BF=87=E6=9C=9Fkey?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/hyc/domain/FaultMessage.java | 40 +++++++++++++++++ .../demo/listener/KeyExpiredListener.java | 45 +++++++++++++++++++ .../java/com/hyc/util/EventHandleUtil.java | 45 +++++++++++++++++++ 3 files changed, 130 insertions(+) create mode 100644 src/main/java/com/hyc/domain/FaultMessage.java create mode 100644 src/main/java/com/hyc/kafka/demo/listener/KeyExpiredListener.java create mode 100644 src/main/java/com/hyc/util/EventHandleUtil.java diff --git a/src/main/java/com/hyc/domain/FaultMessage.java b/src/main/java/com/hyc/domain/FaultMessage.java new file mode 100644 index 0000000..1dbf62a --- /dev/null +++ b/src/main/java/com/hyc/domain/FaultMessage.java @@ -0,0 +1,40 @@ +package com.hyc.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import java.util.Date; + +/** + * 故障消息对象 + * + * @author YouChe·He + * @ClassName: FaultMessage + * @Description: 故障消息对象 + * @CreateTime: 2024/6/20 15:30 + */ +@Data +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class FaultMessage { + /** + * VIN + */ + private String vin; + /** + * 故障码 + */ + private String faultCode; + /** + * 发生时间 + */ + private Date occurrenceTime; + /** + * 动作类型 + */ + private String actionType; + +} diff --git a/src/main/java/com/hyc/kafka/demo/listener/KeyExpiredListener.java b/src/main/java/com/hyc/kafka/demo/listener/KeyExpiredListener.java new file mode 100644 index 0000000..e560199 --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/listener/KeyExpiredListener.java @@ -0,0 +1,45 @@ +package com.hyc.kafka.demo.listener; + +import com.alibaba.fastjson.JSONObject; +import com.hyc.config.RabbitmqConfig; +import com.hyc.domain.FaultMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import java.util.Date; + +/** + * key过期监听器 + * + * @author YouChe·He + * @ClassName: KeyExpiredListener + * @Description: key过期监听器 + * @CreateTime: 2024/6/20 21:59 + */ +@Slf4j +public class KeyExpiredListener extends KeyExpirationEventMessageListener { + + private RabbitTemplate rabbitTemplate; + + public KeyExpiredListener(RedisMessageListenerContainer listenerContainer,RabbitTemplate rabbitTemplate) { + super(listenerContainer); + this.rabbitTemplate = rabbitTemplate; + } + + @Override + public void onMessage(Message message, byte[] pattern) { + String expireKey = message.toString(); + log.info("过期的key:" + expireKey); + //在这里做过期key的处理 + if (expireKey.startsWith("car_")){ + String[] split = expireKey.split("_"); + FaultMessage faultMessage = new FaultMessage(split[1], split[2], new Date(), "update"); + rabbitTemplate.convertAndSend(RabbitmqConfig.FAULT_EXCHANGE,RabbitmqConfig.FAULT_MESSAGE_ROUTINGKEY, JSONObject.toJSONString(faultMessage)); + } + } +} diff --git a/src/main/java/com/hyc/util/EventHandleUtil.java b/src/main/java/com/hyc/util/EventHandleUtil.java new file mode 100644 index 0000000..6b98bed --- /dev/null +++ b/src/main/java/com/hyc/util/EventHandleUtil.java @@ -0,0 +1,45 @@ +package com.hyc.util; + +import com.alibaba.fastjson.JSONObject; +import com.hyc.config.RabbitmqConfig; +import com.hyc.domain.FaultMessage; +import com.hyc.domain.VehicleData; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +/** + * 事件处理工具类 + * + * @author YouChe·He + * @ClassName: EventHandleUtil + * @Description: 事件处理工具类 + * @CreateTime: 2024/6/20 15:55 + */ +@Slf4j +public class EventHandleUtil { + public static void insertToCache(CacheManager cacheManager, RedisTemplate redisTemplate, VehicleData vehicleData, String keySuffix, RabbitTemplate rabbitTemplate){ + Cache faultMessageCache = cacheManager.getCache("faultMessage"); + Cache.ValueWrapper valueWrapper = faultMessageCache.get("car_"+vehicleData.getVin() + "_"+keySuffix); + if (valueWrapper == null){ + FaultMessage faultMessage = new FaultMessage("car_"+vehicleData.getVin(), keySuffix, new Date(), "insert"); + //发送故障开始消息 + log.error("发送故障消息!!!"); + rabbitTemplate.convertAndSend(RabbitmqConfig.FAULT_EXCHANGE,RabbitmqConfig.FAULT_MESSAGE_ROUTINGKEY,JSONObject.toJSONString(faultMessage)); + rabbitTemplate.convertAndSend(RabbitmqConfig.FAULT_EXCHANGE,RabbitmqConfig.FAULT_MESSAGE_QUEUE,JSONObject.toJSONString(faultMessage)); + faultMessageCache.put("car_"+vehicleData.getVin() + "_" + keySuffix, JSONObject.toJSONString(vehicleData)); + redisTemplate.opsForValue().set("car_"+vehicleData.getVin()+"_"+keySuffix,JSONObject.toJSONString(vehicleData),10, TimeUnit.SECONDS); + }else { + log.warn("已经存在{}故障",keySuffix); + //已经存在此故障,继续添加 再次添加应该会更新缓存时间 + faultMessageCache.put("car_"+vehicleData.getVin() + "_" + keySuffix, JSONObject.toJSONString(vehicleData)); + //同时添加到redis,设置过期时间为10S + redisTemplate.opsForValue().set("car_"+vehicleData.getVin()+"_"+keySuffix,JSONObject.toJSONString(vehicleData),10, TimeUnit.SECONDS); + } + } +}