From 2308d974be8bc4431a361daddf2b3ff9af88a091 Mon Sep 17 00:00:00 2001 From: Number7 <1845377266@qq.com> Date: Thu, 10 Oct 2024 19:53:34 +0800 Subject: [PATCH] =?UTF-8?q?fix():=E6=95=B0=E6=8D=AE=E9=A2=84=E8=AD=A6?= =?UTF-8?q?=E7=BC=96=E5=86=99=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-common/cloud-common-kafka/pom.xml | 4 -- .../muyu/event/consumer/MessageConsumer.java | 19 +++++--- .../event/listener/AddDatabaseListener.java | 7 --- .../muyu/event/service/IncidentService.java | 1 + .../service/impl/IncidentServiceImpl.java | 48 +++++++++++++------ .../muyu/template/config/MqttConfigure.java | 1 - .../java/com/muyu/common/domain/SysCar.java | 3 +- 7 files changed, 47 insertions(+), 36 deletions(-) diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml index abe86fc..3b47731 100644 --- a/cloud-common/cloud-common-kafka/pom.xml +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -33,9 +33,5 @@ com.muyu cloud-common-core - - - - diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java index 5caad0b..7caad4b 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java @@ -1,7 +1,7 @@ package com.muyu.event.consumer; - import com.alibaba.fastjson2.JSONObject; import com.muyu.event.basic.EventPublisher; +import com.muyu.event.service.IncidentService; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -9,13 +9,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; - import java.time.Duration; import java.util.Collections; import java.util.List; - import static org.bouncycastle.asn1.x500.style.RFC4519Style.l; - /** * kafka监听 * @author 刘武 @@ -29,16 +26,18 @@ public class MessageConsumer implements ApplicationRunner { @Autowired public KafkaConsumer consumer; - @Autowired + private EventPublisher eventPublisher; - private final String topic="four_car"; + private final String topic="kafka-topic"; + + @Autowired + private IncidentService incidentService; @Override public void run(ApplicationArguments args) throws Exception { List list = Collections.singletonList(topic); consumer.subscribe(list); - while (true){ ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); consumerRecords.forEach(record -> { @@ -46,6 +45,12 @@ public class MessageConsumer implements ApplicationRunner { JSONObject jsonObject = JSONObject.parseObject(value); log.info("value:{}",value); eventPublisher.publishEvent(jsonObject); + try { + incidentService.warnEventProcess(jsonObject); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } } diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java index 95e33bd..4d94fe6 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java @@ -1,13 +1,9 @@ package com.muyu.event.listener; - import com.alibaba.fastjson2.JSONObject; import com.muyu.event.basic.EventCustom; import com.muyu.event.basic.EventListener; - - import java.util.ArrayList; import java.util.List; - /** * 添加数据库事件 * @program: cloud-server @@ -29,9 +25,6 @@ public class AddDatabaseListener implements EventListener { values.add((String) value); }); - - - } @Override diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java index 4f60079..60c8c0e 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java @@ -12,5 +12,6 @@ public interface IncidentService { void warnEventProcess(JSONObject jsonObject) throws Exception; + void eventAlarmProcessing(JSONObject jsonObject) throws Exception; } diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java index fb90eb9..b205563 100644 --- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java +++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java @@ -1,11 +1,9 @@ package com.muyu.event.service.impl; import com.alibaba.fastjson2.JSONObject; -import com.muyu.cache.MessageTemplateTypeCacheService; -import com.muyu.cache.SysCarCacheService; -import com.muyu.cache.WarnRuleCacheService; -import com.muyu.cache.WarnStrategyCacheService; +import com.muyu.cache.*; import com.muyu.common.domain.MessageTemplateType; import com.muyu.common.domain.SysCar; +import com.muyu.common.domain.SysCarFault; import com.muyu.common.domain.resp.SysCarVo; import com.muyu.common.domain.resp.WarnRuleResp; import com.muyu.common.domain.resp.WarnStrategyResp; @@ -38,6 +36,7 @@ import java.util.stream.Collectors; @Log4j2 @Service public class IncidentServiceImpl implements IncidentService { + private static int DURATION_SECONDS = 5; private static List receivedStrings = new ArrayList<>(); private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @@ -61,27 +60,23 @@ public class IncidentServiceImpl implements IncidentService { //报文模版 @Resource private MessageTemplateTypeCacheService messageTemplateTypeCacheService; - - private final String topic="four_car"; - + //kafka的主题名称 + private final String topic="kafka-topic"; @Autowired - private MessageConsumer messageConsumer; - - @Autowired - public KafkaConsumer consumer; - + private SysCarFaultCacheService sysCarFaultCacheService; @Override public void warnEventProcess(JSONObject jsonObject) throws Exception { - receivedStrings.add(jsonObject); //协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings //根据这个车辆VIN查询出他对应的车辆类型 String carVin=null; //报文模版的ID Integer templateId=null; + //取出这辆车的carVin for (JSONObject receivedString : receivedStrings) { carVin = (String) receivedString.get("carVin"); } + //这辆车的信息 SysCar carByVin = null; List carVoList = sysCarCacheService.get(sysCarCacheService.keyPre()); Map carMap = carVoList.stream() @@ -94,6 +89,7 @@ public class IncidentServiceImpl implements IncidentService { List carWithWarnStrategyList=null; List warnStrategyResps = warnStrategyCacheService.get(warnStrategyCacheService.keyPre()); for (WarnStrategyResp warnStrategyResp : warnStrategyResps) { + //策略中有绑定的车辆ID if(warnStrategyResp.getCarTypeId()==carByVin.getCarTypeId()){ carWithWarnStrategyList.add(warnStrategyResp); } @@ -154,7 +150,29 @@ public class IncidentServiceImpl implements IncidentService { } } } + @Override + public void eventAlarmProcessing(JSONObject jsonObject) throws Exception { + String carVin = (String) jsonObject.get("carVin"); + //车辆缓存配置 + List carVoList = sysCarCacheService.get(sysCarCacheService.keyPre()); + //这辆车的信息 + SysCar carByVin = null; + Map carMap = carVoList.stream() + .collect(Collectors.toMap(SysCarVo::getCarVin, Function.identity())); + //获取到了这个车辆的信息 + carByVin = carMap.get(carVin); + Long carTypeId = carByVin.getCarTypeId(); + //车辆绑定的报警规则 + List sysCarFaults = sysCarFaultCacheService.get(sysCarFaultCacheService.keyPre()); + //车辆的所有报警规则都重新存储在sysCarFaultList里面 + List sysCarFaultList=null; + SysCar finalCarByVin = carByVin; + sysCarFaults.forEach(sysCarFault -> { + if(sysCarFault.getCarTypeId().equals(carTypeId)){ + sysCarFaultList.add(sysCarFault); + } + }); + //报文模版 - - + } } diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java index 9eb42bd..1e130a3 100644 --- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java +++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java @@ -160,7 +160,6 @@ public class MqttConfigure { log.info("发送kafka成功"); return jsonObject; } - //kafka发送消息 public void sendKafka(JSONObject jsonObject){ ProducerRecord stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString()); diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java index c40467d..bc2096f 100644 --- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java +++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java @@ -1,5 +1,4 @@ package com.muyu.common.domain; - import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; @@ -8,7 +7,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; - /** * 车辆管理表 * @author sx @@ -22,6 +20,7 @@ import lombok.NoArgsConstructor; @EqualsAndHashCode(callSuper = true) @TableName(value = "sys_car",autoResultMap = true) public class SysCar extends BaseEntity { + @TableId(value = "id",type = IdType.AUTO) private Long id; private String carVin;