From 2430d10401036f5d89aa7986f6f6de4da0558399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Mon, 7 Oct 2024 10:10:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:()=20=E6=96=B0=E5=A2=9E=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E5=92=8C=E4=BF=AE=E6=94=B9=E4=B8=8A=E7=BA=BF=E4=B8=8B=E7=BA=BF?= =?UTF-8?q?=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/KafkaConsumerService.java | 19 +++++- .../consumer/OfflineMonitoringConsumer.java | 8 +++ .../consumer/OnLineMonitoringConsumer.java | 34 ++++++++++- .../processing/interfaces/EventInterface.java | 2 + .../com/muyu/processing/utils/CacheUtil.java | 58 +++++++++++++++++++ 5 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index 835f553..1c2d7fd 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -6,7 +6,12 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.muyu.common.core.constant.KafkaConstants; +import com.muyu.domain.Fence; +import com.muyu.domain.Vehicle; +import com.muyu.domain.WarnRule; +import com.muyu.domain.WarnStrategy; import com.muyu.processing.interfaces.EventInterface; +import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -17,6 +22,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.Duration; import java.util.Collection; +import java.util.Map; /** * kafka消费者 @@ -33,6 +39,9 @@ public class KafkaConsumerService implements InitializingBean { @Resource private KafkaConsumer kafkaConsumer; + @Resource + private CacheUtil cacheUtil; + // @Resource // private EventInterface eventInterface; @@ -54,8 +63,16 @@ public class KafkaConsumerService implements InitializingBean { JSONObject jsonObject = JSON.parseObject(originalMsg); log.info("消费数据转换为JSON对象: " + jsonObject); log.info("消费数据转换为JSON对象: " + jsonObject.toString()); -// eventInterface.handle(jsonObject); + String value = jsonObject.toString(); + String vin = value.substring(0, 11); + Map map = (Map) cacheUtil.get(vin); + WarnRule warnRule = (WarnRule) map.get("warnRule"); + WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy"); + Vehicle vehicle = (Vehicle) map.get("vehicle"); + Object breakdown = map.get("breakdown"); + Fence fence = (Fence) map.get("fence"); +// eventInterface.handle(jsonObject); } } }); diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java index 460fbe5..d3f6232 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java @@ -1,5 +1,9 @@ package com.muyu.processing.consumer; +import com.muyu.enterprise.cache.FaultCacheService; +import com.muyu.enterprise.cache.FenceCahceService; +import com.muyu.enterprise.cache.VehicleCacheService; +import com.muyu.enterprise.cache.WarnRuleCacheService; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.annotation.Queue; @@ -23,6 +27,10 @@ public class OfflineMonitoringConsumer { @Resource private CacheUtil cacheUtil; + + + + /** * 接收消息 * @param vin 车辆vin diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java index 673b466..4127ede 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java @@ -1,5 +1,10 @@ package com.muyu.processing.consumer; +import com.muyu.domain.Fence; +import com.muyu.domain.Vehicle; +import com.muyu.domain.WarnRule; +import com.muyu.domain.WarnStrategy; +import com.muyu.enterprise.cache.*; import com.muyu.processing.utils.CacheUtil; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; @@ -9,6 +14,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.HashMap; /** * 上线监听 @@ -25,6 +31,21 @@ public class OnLineMonitoringConsumer { @Resource private CacheUtil cacheUtil; + @Resource + private VehicleCacheService vehicleCacheService; + + @Resource + private FaultCacheService faultCacheService; + + @Resource + private FenceCahceService fenceCahceService; + + @Resource + private WarnRuleCacheService warnRuleCacheService; + + @Resource + private WarnStrategyCacheService warnStrategyCacheService; + /** * 上线监听车辆网关中车辆上线时 */ @@ -33,7 +54,18 @@ public class OnLineMonitoringConsumer { try { log.info("添加本地缓存,车辆vin: {}", vin); - + WarnRule warnRule = warnRuleCacheService.get(vin); + WarnStrategy warnStrategy = warnStrategyCacheService.get(vin); + Vehicle vehicle = vehicleCacheService.get(vin); + Object breakdown = faultCacheService.get(vin); + Fence fence = fenceCahceService.get(vin); + HashMap map = new HashMap<>(); + map.put("warnRule",warnRule); + map.put("warnStrategy",warnStrategy); + map.put("vehicle",vehicle); + map.put("breakdown",breakdown); + map.put("fence",fence); + cacheUtil.put(vin,map); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java index 7301387..32b77d6 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/interfaces/EventInterface.java @@ -1,6 +1,8 @@ package com.muyu.processing.interfaces; import com.alibaba.fastjson.JSONObject; +import org.checkerframework.checker.units.qual.C; +import org.springframework.stereotype.Component; /** * 事件处理接口 diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java new file mode 100644 index 0000000..46e5e54 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/utils/CacheUtil.java @@ -0,0 +1,58 @@ +package com.muyu.processing.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.stereotype.Component; + +/** + * 缓存工具类 + * @Author:杨鹏 + * @Package:com.muyu.processing.utils + * @Project:cloud-vehicle + * @name:CacheUtil + * @Date:2024/10/4 15:14 + */ +@Component +public class CacheUtil { + + /** + * 缓存对象 + */ + private final Cache cache; + + /** + * 默认构建函数 + */ + public CacheUtil(){ + this.cache = Caffeine.newBuilder() + .maximumSize(500L) + .build(); + } + + /** + * 获得缓存 + * @param key 键 + * @return 返回的值 + */ + public T get(String key){ + return cache.getIfPresent(key); + } + + /** + * 添加缓存 + * @param key 键 + * @param value 值 + */ + public void put(String key, T value){ + cache.put(key, value); + } + + /** + * 删除缓存 + * @param key 键 + */ + public void remove(String key){ + cache.invalidate(key); + } + +}