diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index 496e254..a104fff 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -37,6 +37,8 @@ public class KafkaConsumerService implements InitializingBean { */ private static final String TIPSY = "tipsy"; + private static final String LYRIC = "lyric"; + @Resource private KafkaConsumer kafkaConsumer; diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java index 7ed838f..781af0d 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java @@ -46,7 +46,7 @@ public class OfflineMonitoringConsumer { log.info("清除缓存中的数据,车辆vin: {}", vin); // 清除缓存 cacheUtil.remove(vin); - log.info("vin码为: {}de本地缓存清除成功",vin); + log.info("vin码为: {}本地缓存清除成功",vin); } } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java index c9d0d83..16021ae 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java @@ -32,7 +32,7 @@ public class OnLineMonitoringConsumer { /** * 上线监听队列名称 */ - private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING"; + private static final String ON_LINE_MONITORING = "queue_inform_email"; /** * 上线监听交换机名称 */ diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java index 62fb477..cbc60dd 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java @@ -25,12 +25,14 @@ import javax.annotation.Resource; public class TestKafka { private static final String TIPSY = "tipsy"; + + private static final String LYRIC = "lyric"; private static final String VIN = "63YCZDY6336C8H4CA"; /** * 上线监听队列名称 */ - private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING"; + private static final String ON_LINE_MONITORING = "queue_inform_email"; /** * 下线监听队列名称 */ @@ -49,6 +51,7 @@ public class TestKafka { JSONObject entries = new JSONObject(); entries.set("VIN码",VIN); entries.set("name","宝马"); + entries.set("Numerical_value",6); String entriesString = entries.toString(); ProducerRecord producerRecord = new ProducerRecord<>(TIPSY, entriesString); kafkaProducer.send(producerRecord); diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java index 07da065..66dafb2 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/AddDatabaseListener.java @@ -6,6 +6,7 @@ import com.muyu.processing.basic.EventListener; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.apache.kafka.common.protocol.types.Field; +import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; @@ -20,6 +21,7 @@ import java.util.Map; * @Date:2024/9/29 22:25 */ @Log4j2 +@Component public class AddDatabaseListener implements EventListener { @Resource @@ -32,7 +34,6 @@ public class AddDatabaseListener implements EventListener { String vin = (String) jsonObject.get("VIN码"); Map map = (Map) cacheUtil.get(vin); if (map != null){ - log.info("本地缓存数据为: {}",map); ArrayList keys = new ArrayList<>(); ArrayList values = new ArrayList<>(); jsonObject.forEach((key, value) ->{ diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/FenceListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/FenceListener.java index fba78a1..dd2f3c6 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/FenceListener.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/FenceListener.java @@ -7,6 +7,7 @@ import com.muyu.processing.basic.EventListener; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.awt.*; import java.util.ArrayList; @@ -21,6 +22,7 @@ import java.util.HashMap; * @Date:2024/10/9 11:17 */ @Log4j2 +@Component public class FenceListener implements EventListener { @Autowired diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/IdentifyingFailuresListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/IdentifyingFailuresListener.java index 15d9d60..1b1791f 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/IdentifyingFailuresListener.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/IdentifyingFailuresListener.java @@ -7,6 +7,7 @@ import com.muyu.processing.basic.EventListener; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.HashMap; @@ -20,6 +21,7 @@ import java.util.HashMap; * @Date:2024/10/9 11:20 */ @Log4j2 +@Component public class IdentifyingFailuresListener implements EventListener { @Autowired diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnRuleListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnRuleListener.java new file mode 100644 index 0000000..26aca84 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnRuleListener.java @@ -0,0 +1,56 @@ +package com.muyu.processing.listener; + +import com.alibaba.fastjson.JSONObject; +import com.muyu.domain.WarnRule; +import com.muyu.processing.basic.EventCustom; +import com.muyu.processing.basic.EventListener; +import com.muyu.processing.utils.CacheUtil; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.Map; + +/** + * 预警规则事件 + * @Author:杨鹏 + * @Package:com.muyu.processing.listener + * @Project:cloud-vehicle + * @name:WarnRuleListener + * @Date:2024/10/10 13:57 + */ +@Log4j2 +public class WarnRuleListener implements EventListener { + + @Resource + private CacheUtil cacheUtil; + + @Override + public void onEvent(EventCustom event) { + String vin = null; + try { + log.info("预警规则"); + JSONObject jsonObject = event.getData(); + vin = (String) jsonObject.get("VIN码"); + Map map = (Map) cacheUtil.get(vin); + if (map != null){ + WarnRule warnRule = (WarnRule) map.get("warnRule"); + String ruleName = warnRule.getRuleName(); + Integer minValue = warnRule.getMinValue(); + Integer maxValue = warnRule.getMaxValue(); + Date slideTime = warnRule.getSlideTime(); + Integer slideFrequency = warnRule.getSlideFrequency(); + log.info("滑窗时间: {}",slideTime); + log.info("滑窗频率: {}",slideFrequency); + } + } catch (Exception e) { + log.info(">>报错<<预警规则: VIN码为: {},事件发送错误",vin,e.getMessage()); + } + } + + @Override + public void onApplicationEvent(EventCustom event) { + onEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnRuleRespListener.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnRuleRespListener.java index fe76c43..805103f 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnRuleRespListener.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/listener/WarnRuleRespListener.java @@ -43,32 +43,38 @@ public class WarnRuleRespListener implements EventListener { */ @Override public void onEvent(EventCustom event) { - // 获得数据 - JSONObject jsonObject = event.getData(); - // 获得VIN码 - String vin = (String) jsonObject.get("VIN码"); - // 获得本地缓存数据 - Map map = (Map) cacheUtil.get(vin); - if (map != null){ - log.info("本地缓存数据为: {}",map); - // 获得警告规矩集合 - WarnRuleResp warnRuleResp = (WarnRuleResp) map.get("warnRuleResp"); - // 获得预警规则 - List warnRuleList = warnRuleResp.getWarnRuleList(); - // 遍历预警规则 - for (WarnRule warnRule : warnRuleList) { - String ruleName = warnRule.getRuleName(); - Integer maxValue = warnRule.getMaxValue(); - Integer minValue = warnRule.getMinValue(); - if (maxValue >= MAX_VALUE || minValue <= MIN_VALUE){ - - } - } - } + String vin = null; + try { + // 获得数据 + JSONObject jsonObject = event.getData(); + // 获得VIN码 + vin = (String) jsonObject.get("VIN码"); + Integer Numerical_value = (Integer) jsonObject.get("Numerical_value"); + // 获得本地缓存数 + log.info("警告事件: VIN码为: {}",vin); + Map map = (Map) cacheUtil.get(vin); + if (map != null){ + // 获得警告规矩集合 + WarnRuleResp warnRuleResp = (WarnRuleResp) map.get("warnRuleResp"); + // 获得预警规则 + List warnRuleList = warnRuleResp.getWarnRuleList(); + // 遍历预警规则 + for (WarnRule warnRule : warnRuleList) { + String ruleName = warnRule.getRuleName(); + Integer maxValue = warnRule.getMaxValue(); + Integer minValue = warnRule.getMinValue(); + if (maxValue <= Numerical_value || minValue >= Numerical_value){ + log.info("数据值的大小有出入, 规则名称: {}, 数据值为: {}",ruleName,Numerical_value); + } + } + } + } catch (Exception e) { + log.info(">>报错<<警告事件: VIN码为: {}, 事件发送错误:",vin,e.getMessage()); + } } @Override public void onApplicationEvent(EventCustom event) { - + onEvent(event); } }