From d17f6842d74f5e91f6410828a2e9bfd2bbdbdf2b Mon Sep 17 00:00:00 2001 From: 20300 <643145201@qq.com> Date: Mon, 1 Jul 2024 08:40:54 +0800 Subject: [PATCH] =?UTF-8?q?feat()=E6=8C=87=E6=A0=87=E9=A2=84=E8=AD=A6?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/hyc/config/HandleConnection.java | 5 +- .../java/com/hyc/config/RabbitmqConfig.java | 9 +++ .../java/com/hyc/config/SyncCacheRunner.java | 10 ++- .../consumer/CreateMqttClientConsumer.java | 2 +- .../java/com/hyc/consumer/UpdateConfig.java | 72 ++++++++++++++++- .../hyc/domain/cache/IndexWarnToRedis.java | 4 +- .../hyc/domain/req/HandleIndexWarnConfig.java | 44 +++++++++++ .../kafka/demo/strategy/EventStrategy.java | 17 ++-- .../listener/IndexWarnListener.java | 79 +++++++++++-------- 9 files changed, 187 insertions(+), 55 deletions(-) create mode 100644 src/main/java/com/hyc/domain/req/HandleIndexWarnConfig.java diff --git a/src/main/java/com/hyc/config/HandleConnection.java b/src/main/java/com/hyc/config/HandleConnection.java index 18d5bcb..2eeafe6 100644 --- a/src/main/java/com/hyc/config/HandleConnection.java +++ b/src/main/java/com/hyc/config/HandleConnection.java @@ -37,10 +37,7 @@ public class HandleConnection { String s = new String(message.getBody()); JSONObject jsonObject = JSONObject.parseObject(s); String carVin = jsonObject.getString("clientId"); -// -// String s1 = redisTemplate.opsForValue().get(carVin + "_Index_Warn"); -// -// indexWarn.put(carVin+"_Index_Warn", s1); + log.error("链接事件得到的VIN:{}",carVin); } diff --git a/src/main/java/com/hyc/config/RabbitmqConfig.java b/src/main/java/com/hyc/config/RabbitmqConfig.java index f1e432a..d791208 100644 --- a/src/main/java/com/hyc/config/RabbitmqConfig.java +++ b/src/main/java/com/hyc/config/RabbitmqConfig.java @@ -28,10 +28,19 @@ public class RabbitmqConfig { * 修改指标预警配置队列 */ public final static String UPDATE_INDEX_WARN = "update_index_warn"; + /** + * 修改报文配置 + */ + public final static String UPDATE_ANALYZE_RULE = "update_analyze_rule"; public static final String CREATE_MQTT_CLIENT = "create_mqtt_client"; + @Bean(UPDATE_ANALYZE_RULE) + public Queue UPDATE_ANALYZE_RULE(){ + return new Queue(UPDATE_ANALYZE_RULE); + } + @Bean(UPDATE_INDEX_WARN) public Queue UPDATE_INDEX_WARN(){ return new Queue(UPDATE_INDEX_WARN); diff --git a/src/main/java/com/hyc/config/SyncCacheRunner.java b/src/main/java/com/hyc/config/SyncCacheRunner.java index 6fac7b7..a2311f2 100644 --- a/src/main/java/com/hyc/config/SyncCacheRunner.java +++ b/src/main/java/com/hyc/config/SyncCacheRunner.java @@ -62,10 +62,14 @@ public class SyncCacheRunner implements ApplicationRunner { analyzeRuleCache.put(cacheCarEvent.getVin()+"_Analyze_Rule", JSON.toJSONString(analyzeRules)); - IndexWarnToRedis cacheIndexWarn = new IndexWarnToRedis(cacheCarEvent.getVin(), "speed,voltage", 20, 10); - redisTemplate.opsForValue().set(cacheCarEvent.getVin()+"_Index_Warn", JSON.toJSONString(cacheIndexWarn)); - System.out.println(cacheIndexWarn.toString()); + //模拟指标配置 + ArrayList indexWarnToRedis = new ArrayList<>(); + indexWarnToRedis.add(new IndexWarnToRedis(cacheCarEvent.getVin(), "speed", 20, 10)); + indexWarnToRedis.add(new IndexWarnToRedis(cacheCarEvent.getVin(), "voltage", 20, 10)); + + redisTemplate.opsForValue().set(cacheCarEvent.getVin()+"_Index_Warn", JSON.toJSONString(indexWarnToRedis)); + diff --git a/src/main/java/com/hyc/consumer/CreateMqttClientConsumer.java b/src/main/java/com/hyc/consumer/CreateMqttClientConsumer.java index e4a7438..0335b5d 100644 --- a/src/main/java/com/hyc/consumer/CreateMqttClientConsumer.java +++ b/src/main/java/com/hyc/consumer/CreateMqttClientConsumer.java @@ -128,7 +128,7 @@ public class CreateMqttClientConsumer { try { String jsonData = objectMapper.writeValueAsString(linkedHashMap); log.error("json格式:{}", jsonData); - String finalVin = vin; + String finalVin = vin;//vin - iFeng transactionTemplate.execute(status -> { try { kafkaTemplate.send("topichyc", finalVin, jsonData); diff --git a/src/main/java/com/hyc/consumer/UpdateConfig.java b/src/main/java/com/hyc/consumer/UpdateConfig.java index 68ba71b..9af413c 100644 --- a/src/main/java/com/hyc/consumer/UpdateConfig.java +++ b/src/main/java/com/hyc/consumer/UpdateConfig.java @@ -2,6 +2,9 @@ package com.hyc.consumer; import com.alibaba.fastjson.JSONObject; import com.hyc.config.RabbitmqConfig; +import com.hyc.domain.cache.CacheIndexWarn; +import com.hyc.domain.cache.IndexWarnToRedis; +import com.hyc.domain.req.HandleIndexWarnConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; @@ -10,8 +13,12 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; +import java.util.List; +import java.util.stream.Collectors; + /** * 更新本地缓存配置消费者 * @@ -25,6 +32,8 @@ import org.springframework.stereotype.Component; public class UpdateConfig { @Autowired private RabbitTemplate rabbitTemplate; + @Autowired + private ApplicationEventPublisher applicationEventPublisher; @Autowired private CacheManager cacheManager; @@ -32,9 +41,68 @@ public class UpdateConfig { public void indexWarnEventMessage(String indexWarnToRedis, Channel channel, Message message){ // List indexWarnEventResps = JSONObject.parseArray(indexWarnMessage, IndexWarnEventResp.class); String vin = JSONObject.parseObject(indexWarnToRedis).getString("vin"); + HandleIndexWarnConfig handleIndexWarnConfig = JSONObject.parseObject(indexWarnToRedis, HandleIndexWarnConfig.class); Cache indexWarn = cacheManager.getCache("indexWarn"); - indexWarn.put(vin + "_Index_Warn",indexWarnToRedis); Cache.ValueWrapper valueWrapper = indexWarn.get(vin + "_Index_Warn"); - log.warn("更新后的值是:{}",valueWrapper.get().toString()); + + if ("insert".equals(handleIndexWarnConfig.getHandleType())){ + if (valueWrapper != null){ + + String s = valueWrapper.get().toString(); + List indexWarnToRedisList = JSONObject.parseArray(s, IndexWarnToRedis.class); + + indexWarnToRedisList.add(new IndexWarnToRedis(handleIndexWarnConfig.getVin(), handleIndexWarnConfig.getIndexNames(), handleIndexWarnConfig.getTotalLength(),handleIndexWarnConfig.getSlideLength())); + + indexWarn.put(handleIndexWarnConfig.getVin() + "_Index_Warn",JSONObject.toJSONString(indexWarnToRedisList)); + + applicationEventPublisher.publishEvent(new CacheIndexWarn(this, handleIndexWarnConfig.getVin(), handleIndexWarnConfig.getIndexNames(), handleIndexWarnConfig.getTotalLength(), handleIndexWarnConfig.getSlideLength())); + }else { + + } + } else if ("delete".equals(handleIndexWarnConfig.getHandleType())) { + if (valueWrapper != null){ + String s = valueWrapper.get().toString(); + List indexWarnToRedisList = JSONObject.parseArray(s, IndexWarnToRedis.class); + List collect = indexWarnToRedisList.stream().filter(indexWarnToRedis1 -> { + return indexWarnToRedis1.getIndexName() .equals(handleIndexWarnConfig.getIndexNames()) ? false : true; + }).collect(Collectors.toList()); + + indexWarn.put(handleIndexWarnConfig.getVin() + "_Index_Warn",JSONObject.toJSONString(collect)); + + + } + } else if ("update".equals(handleIndexWarnConfig.getHandleType())) { + if (valueWrapper != null){ + String s = valueWrapper.get().toString(); + List indexWarnToRedisList = JSONObject.parseArray(s, IndexWarnToRedis.class); + List collect = indexWarnToRedisList.stream().map(indexWarnToRedis1 -> { + if (indexWarnToRedis1.getIndexName().equals(handleIndexWarnConfig.getIndexNames())) { + return new IndexWarnToRedis(handleIndexWarnConfig.getVin(), handleIndexWarnConfig.getIndexNames(), handleIndexWarnConfig.getTotalLength(), handleIndexWarnConfig.getSlideLength()); + }else { + return indexWarnToRedis1; + } + + }).collect(Collectors.toList()); + indexWarn.put(handleIndexWarnConfig.getVin() + "_Index_Warn",JSONObject.toJSONString(collect)); + } + } + + if (valueWrapper != null){ + log.warn("更新后的值是:{}",valueWrapper.get().toString()); + } + + + } + + @RabbitListener(queues = RabbitmqConfig.UPDATE_ANALYZE_RULE) + public void analyzeUpdateMessage(String analyzeUpdateMessage, Channel channel, Message message){ + JSONObject jsonObject = JSONObject.parseObject(analyzeUpdateMessage); + String vin = jsonObject.getString("vin"); + log.warn("报文解析获取到的vin是:{}",vin); + String analyzeRules = jsonObject.getString("analyzeRules"); + log.warn("传递的报文信息是:{}",analyzeRules); + Cache analyzeRule = cacheManager.getCache("analyzeRule"); + analyzeRule.put(vin + "_Analyze_Rule",analyzeRules); + log.info("vin为:{}的车辆报文解析配置更新成功!",vin); } } diff --git a/src/main/java/com/hyc/domain/cache/IndexWarnToRedis.java b/src/main/java/com/hyc/domain/cache/IndexWarnToRedis.java index 43034a1..bfe2189 100644 --- a/src/main/java/com/hyc/domain/cache/IndexWarnToRedis.java +++ b/src/main/java/com/hyc/domain/cache/IndexWarnToRedis.java @@ -26,9 +26,9 @@ public class IndexWarnToRedis{ */ private String vin; /** - * 指标项名字,多项以逗号隔开 + * 指标项名字 */ - private String indexNames; + private String indexName; /** * 滑窗长度 单位秒 */ diff --git a/src/main/java/com/hyc/domain/req/HandleIndexWarnConfig.java b/src/main/java/com/hyc/domain/req/HandleIndexWarnConfig.java new file mode 100644 index 0000000..0c62ef4 --- /dev/null +++ b/src/main/java/com/hyc/domain/req/HandleIndexWarnConfig.java @@ -0,0 +1,44 @@ +package com.hyc.domain.req; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * 缓存指标预警信息参数类 + * + * @author YouChe·He + * @ClassName: CacheIndexWarn + * @Description: 缓存指标预警信息参数类 + * @CreateTime: 2024/6/28 14:32 + */ +@Data +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class HandleIndexWarnConfig { + /** + * 关联小车 + */ + private String vin; + /** + * 指标项名字,多项以逗号隔开 + */ + private String indexNames; + /** + * 滑窗长度 单位秒 + */ + private Integer totalLength; + /** + * 滑动长度 单位秒 + */ + private Integer slideLength; + + /** + * 操作类型 insert delete; + */ + private String handleType; + + +} diff --git a/src/main/java/com/hyc/kafka/demo/strategy/EventStrategy.java b/src/main/java/com/hyc/kafka/demo/strategy/EventStrategy.java index ecc07ac..53480d9 100644 --- a/src/main/java/com/hyc/kafka/demo/strategy/EventStrategy.java +++ b/src/main/java/com/hyc/kafka/demo/strategy/EventStrategy.java @@ -158,24 +158,25 @@ public enum EventStrategy { INDEX_WARNING { @Override public void exe(CacheManager cacheManager, RedisTemplate redisTemplate, JSONObject jsonObject, RabbitTemplate rabbitTemplate, ApplicationEventPublisher applicationEventPublisher, ScheduledExecutorService executorService, IotDbServer iotDbServer) { + String vin = jsonObject.getString("vin"); Cache indexWarn = cacheManager.getCache("indexWarn"); Cache.ValueWrapper valueWrapper = indexWarn.get(vin + "_Index_Warn"); - if (valueWrapper == null){ + if (valueWrapper == null) { String indexWarnJsonString = redisTemplate.opsForValue().get(vin + "_Index_Warn"); - - indexWarn.put(vin+"_Index_Warn", indexWarnJsonString); - Cache indexWarnTwo = cacheManager.getCache("indexWarn"); + indexWarn.put(vin + "_Index_Warn", indexWarnJsonString); Cache.ValueWrapper valueWrapperTwo = indexWarn.get(vin + "_Index_Warn"); String cacheIndexWarnJsonString = valueWrapperTwo.get().toString(); - IndexWarnToRedis cacheIndexWarn = JSONObject.parseObject(cacheIndexWarnJsonString, IndexWarnToRedis.class); + List indexWarnToRedis = JSONObject.parseArray(cacheIndexWarnJsonString, IndexWarnToRedis.class); + for (IndexWarnToRedis indexWarnToR : indexWarnToRedis) { + applicationEventPublisher.publishEvent(new CacheIndexWarn(this, indexWarnToR.getVin(), indexWarnToR.getIndexName(), indexWarnToR.getTotalLength(), indexWarnToR.getSlideLength())); + } log.info("指标预警Boot事件首次发布!"); - applicationEventPublisher.publishEvent(new CacheIndexWarn(this, cacheIndexWarn.getVin(), cacheIndexWarn.getIndexNames(), cacheIndexWarn.getTotalLength(), cacheIndexWarn.getSlideLength())); log.warn("进来了吗?"); - }else { - log.info("车辆:{}对应指标预警事件正在执行!",vin); + } else { + log.info("车辆:{}对应指标预警事件正在执行!", vin); } diff --git a/src/main/java/com/hyc/localevent/listener/IndexWarnListener.java b/src/main/java/com/hyc/localevent/listener/IndexWarnListener.java index 89ed9c6..b7a3ce8 100644 --- a/src/main/java/com/hyc/localevent/listener/IndexWarnListener.java +++ b/src/main/java/com/hyc/localevent/listener/IndexWarnListener.java @@ -50,57 +50,70 @@ public class IndexWarnListener { @EventListener - public void handleIndexWarnEvent(CacheIndexWarn cacheIndexWarn){ + public void handleIndexWarnEvent(CacheIndexWarn cacheIndexWarn) { executorService.schedule(() -> { long time = new Date().getTime(); try { - HashMap stringDoubleHashMap = new HashMap<>(); + //根据VIN从iotDb获取数据 List carDataMapList = (List) iotDbServer.queryDataFromIotDb(new IotDbParam(cacheIndexWarn.getVin(), String.valueOf(time - cacheIndexWarn.getSlideLength() * 1000), String.valueOf(time))); //获取该VIN对应的指标预警配置 - String[] split = cacheIndexWarn.getIndexNames().split(","); + String indexNames = cacheIndexWarn.getIndexNames(); + Double sum = 0.0; for (String s : carDataMapList) { - for (String s1 : split) { - log.warn("s:{},s1:{}",s,s1); - String string = JSONObject.parseObject(s).getString(s1); - stringDoubleHashMap.put(s1,stringDoubleHashMap.get(s1)==null?Double.valueOf(string):Double.valueOf(string)+stringDoubleHashMap.get(s1)); - } - } - for (String s : split) { - stringDoubleHashMap.put(s,stringDoubleHashMap.get(s) / split.length); - } - ArrayList indexWarnEventRespList = new ArrayList<>(); - for (String s : stringDoubleHashMap.keySet()) { - String qs = ""; - if (stringDoubleHashMap.get(s)<600){ - qs = "下降"; - } else if (stringDoubleHashMap.get(s) > 8000) { - qs = "上升"; - }else { - qs = "波动"; - } - indexWarnEventRespList.add(new IndexWarnEventResp(cacheIndexWarn.getVin(),s,stringDoubleHashMap.get(s),qs)); + String string = JSONObject.parseObject(s).getString(indexNames); + sum += Double.valueOf(string); + } + Double avg = sum / carDataMapList.size(); - rabbitTemplate.convertAndSend(RabbitmqConfig.INDEX_WARN_QUEUE, JSONObject.toJSONString(indexWarnEventRespList)); - log.error("计算出来的值是:{}",stringDoubleHashMap); + String qs = ""; + if (avg < 600) { + qs = "下降"; + } else if (avg > 8000) { + qs = "上升"; + } else { + qs = "波动"; + } + IndexWarnEventResp indexWarnEventResp = new IndexWarnEventResp(cacheIndexWarn.getVin(), qs, avg, cacheIndexWarn.getIndexNames()); + + + if (carDataMapList.size() > 0) { + rabbitTemplate.convertAndSend(RabbitmqConfig.INDEX_WARN_QUEUE, JSONObject.toJSONString(indexWarnEventResp)); + } + + log.error("计算出来的值是:{}", indexWarnEventResp); + if (Double.isNaN(indexWarnEventResp.getAverage())) { + log.error("一级警报"); + System.out.println("Average is NaN."); + } + + if (!(indexWarnEventResp.getAverage() instanceof Number || indexWarnEventResp.getAverage() == null)) { + log.error("一级警报:{}", indexWarnEventResp); + System.out.println("他不是一个数字"); + } Cache indexWarn = cacheManager.getCache("indexWarn"); Cache.ValueWrapper valueWrapper = indexWarn.get(cacheIndexWarn.getVin() + "_Index_Warn"); - if (valueWrapper != null){ + if (valueWrapper != null) { String cacheIndexWarnJsonStringTwo = valueWrapper.get().toString(); - IndexWarnToRedis cacheIndexWarnTwo = JSONObject.parseObject(cacheIndexWarnJsonStringTwo, IndexWarnToRedis.class); + List indexWarnToRedis = JSONObject.parseArray(cacheIndexWarnJsonStringTwo, IndexWarnToRedis.class); - applicationEventPublisher.publishEvent(new CacheIndexWarn(this, cacheIndexWarnTwo.getVin(), cacheIndexWarnTwo.getIndexNames(), cacheIndexWarnTwo.getTotalLength(), cacheIndexWarnTwo.getSlideLength())); + for (IndexWarnToRedis indexWarnToRedi : indexWarnToRedis) { + if (indexWarnToRedi.getIndexName().equals(cacheIndexWarn.getIndexNames())) { + applicationEventPublisher.publishEvent(new CacheIndexWarn(this, indexWarnToRedi.getVin(), indexWarnToRedi.getIndexName(), indexWarnToRedi.getTotalLength(), indexWarnToRedi.getSlideLength())); + } + } - }else { - log.info("车辆:{}已下线,指标预警停止",cacheIndexWarn.getVin()); + + } else { + log.info("车辆:{}已下线,指标预警停止", cacheIndexWarn.getVin()); } } catch (Exception e) { @@ -109,12 +122,8 @@ public class IndexWarnListener { }, cacheIndexWarn.getSlideLength(), TimeUnit.SECONDS); - log.error("几个意思,得到的值是啥:{}",cacheIndexWarn); + log.error("几个意思,得到的值是啥:{}", cacheIndexWarn); } - - - - }