diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml
index abe86fc..3b47731 100644
--- a/cloud-common/cloud-common-kafka/pom.xml
+++ b/cloud-common/cloud-common-kafka/pom.xml
@@ -33,9 +33,5 @@
com.muyu
cloud-common-core
-
-
-
-
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java
index 5caad0b..7caad4b 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/consumer/MessageConsumer.java
@@ -1,7 +1,7 @@
package com.muyu.event.consumer;
-
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.basic.EventPublisher;
+import com.muyu.event.service.IncidentService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -9,13 +9,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
-
import java.time.Duration;
import java.util.Collections;
import java.util.List;
-
import static org.bouncycastle.asn1.x500.style.RFC4519Style.l;
-
/**
* kafka监听
* @author 刘武
@@ -29,16 +26,18 @@ public class MessageConsumer implements ApplicationRunner {
@Autowired
public KafkaConsumer consumer;
- @Autowired
+
private EventPublisher eventPublisher;
- private final String topic="four_car";
+ private final String topic="kafka-topic";
+
+ @Autowired
+ private IncidentService incidentService;
@Override
public void run(ApplicationArguments args) throws Exception {
List list = Collections.singletonList(topic);
consumer.subscribe(list);
-
while (true){
ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> {
@@ -46,6 +45,12 @@ public class MessageConsumer implements ApplicationRunner {
JSONObject jsonObject = JSONObject.parseObject(value);
log.info("value:{}",value);
eventPublisher.publishEvent(jsonObject);
+ try {
+ incidentService.warnEventProcess(jsonObject);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
});
}
}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java
index 95e33bd..4d94fe6 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/listener/AddDatabaseListener.java
@@ -1,13 +1,9 @@
package com.muyu.event.listener;
-
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.basic.EventCustom;
import com.muyu.event.basic.EventListener;
-
-
import java.util.ArrayList;
import java.util.List;
-
/**
* 添加数据库事件
* @program: cloud-server
@@ -29,9 +25,6 @@ public class AddDatabaseListener implements EventListener {
values.add((String) value);
});
-
-
-
}
@Override
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java
index 4f60079..60c8c0e 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/IncidentService.java
@@ -12,5 +12,6 @@ public interface IncidentService {
void warnEventProcess(JSONObject jsonObject) throws Exception;
+ void eventAlarmProcessing(JSONObject jsonObject) throws Exception;
}
diff --git a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java
index fb90eb9..b205563 100644
--- a/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java
+++ b/cloud-modules/cloud-event/src/main/java/com/muyu/event/service/impl/IncidentServiceImpl.java
@@ -1,11 +1,9 @@
package com.muyu.event.service.impl;
import com.alibaba.fastjson2.JSONObject;
-import com.muyu.cache.MessageTemplateTypeCacheService;
-import com.muyu.cache.SysCarCacheService;
-import com.muyu.cache.WarnRuleCacheService;
-import com.muyu.cache.WarnStrategyCacheService;
+import com.muyu.cache.*;
import com.muyu.common.domain.MessageTemplateType;
import com.muyu.common.domain.SysCar;
+import com.muyu.common.domain.SysCarFault;
import com.muyu.common.domain.resp.SysCarVo;
import com.muyu.common.domain.resp.WarnRuleResp;
import com.muyu.common.domain.resp.WarnStrategyResp;
@@ -38,6 +36,7 @@ import java.util.stream.Collectors;
@Log4j2
@Service
public class IncidentServiceImpl implements IncidentService {
+
private static int DURATION_SECONDS = 5;
private static List receivedStrings = new ArrayList<>();
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@@ -61,27 +60,23 @@ public class IncidentServiceImpl implements IncidentService {
//报文模版
@Resource
private MessageTemplateTypeCacheService messageTemplateTypeCacheService;
-
- private final String topic="four_car";
-
+ //kafka的主题名称
+ private final String topic="kafka-topic";
@Autowired
- private MessageConsumer messageConsumer;
-
- @Autowired
- public KafkaConsumer consumer;
-
+ private SysCarFaultCacheService sysCarFaultCacheService;
@Override
public void warnEventProcess(JSONObject jsonObject) throws Exception {
-
receivedStrings.add(jsonObject);
//协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings
//根据这个车辆VIN查询出他对应的车辆类型
String carVin=null;
//报文模版的ID
Integer templateId=null;
+ //取出这辆车的carVin
for (JSONObject receivedString : receivedStrings) {
carVin = (String) receivedString.get("carVin");
}
+ //这辆车的信息
SysCar carByVin = null;
List carVoList = sysCarCacheService.get(sysCarCacheService.keyPre());
Map carMap = carVoList.stream()
@@ -94,6 +89,7 @@ public class IncidentServiceImpl implements IncidentService {
List carWithWarnStrategyList=null;
List warnStrategyResps = warnStrategyCacheService.get(warnStrategyCacheService.keyPre());
for (WarnStrategyResp warnStrategyResp : warnStrategyResps) {
+ //策略中有绑定的车辆ID
if(warnStrategyResp.getCarTypeId()==carByVin.getCarTypeId()){
carWithWarnStrategyList.add(warnStrategyResp);
}
@@ -154,7 +150,29 @@ public class IncidentServiceImpl implements IncidentService {
}
}
}
+ @Override
+ public void eventAlarmProcessing(JSONObject jsonObject) throws Exception {
+ String carVin = (String) jsonObject.get("carVin");
+ //车辆缓存配置
+ List carVoList = sysCarCacheService.get(sysCarCacheService.keyPre());
+ //这辆车的信息
+ SysCar carByVin = null;
+ Map carMap = carVoList.stream()
+ .collect(Collectors.toMap(SysCarVo::getCarVin, Function.identity()));
+ //获取到了这个车辆的信息
+ carByVin = carMap.get(carVin);
+ Long carTypeId = carByVin.getCarTypeId();
+ //车辆绑定的报警规则
+ List sysCarFaults = sysCarFaultCacheService.get(sysCarFaultCacheService.keyPre());
+ //车辆的所有报警规则都重新存储在sysCarFaultList里面
+ List sysCarFaultList=null;
+ SysCar finalCarByVin = carByVin;
+ sysCarFaults.forEach(sysCarFault -> {
+ if(sysCarFault.getCarTypeId().equals(carTypeId)){
+ sysCarFaultList.add(sysCarFault);
+ }
+ });
+ //报文模版
-
-
+ }
}
diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java
index 9eb42bd..1e130a3 100644
--- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java
+++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java
@@ -160,7 +160,6 @@ public class MqttConfigure {
log.info("发送kafka成功");
return jsonObject;
}
-
//kafka发送消息
public void sendKafka(JSONObject jsonObject){
ProducerRecord stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString());
diff --git a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java
index c40467d..bc2096f 100644
--- a/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java
+++ b/cloud-modules/saas/saas-common/src/main/java/com/muyu/common/domain/SysCar.java
@@ -1,5 +1,4 @@
package com.muyu.common.domain;
-
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@@ -8,7 +7,6 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
-
/**
* 车辆管理表
* @author sx
@@ -22,6 +20,7 @@ import lombok.NoArgsConstructor;
@EqualsAndHashCode(callSuper = true)
@TableName(value = "sys_car",autoResultMap = true)
public class SysCar extends BaseEntity {
+
@TableId(value = "id",type = IdType.AUTO)
private Long id;
private String carVin;