fix():数据预警编写修改

dev.business
Number7 2024-10-10 19:53:34 +08:00
parent 2e940349ef
commit 2308d974be
7 changed files with 47 additions and 36 deletions

View File

@ -33,9 +33,5 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId> <artifactId>cloud-common-core</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,7 +1,7 @@
package com.muyu.event.consumer; package com.muyu.event.consumer;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.basic.EventPublisher; import com.muyu.event.basic.EventPublisher;
import com.muyu.event.service.IncidentService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -9,13 +9,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import static org.bouncycastle.asn1.x500.style.RFC4519Style.l; import static org.bouncycastle.asn1.x500.style.RFC4519Style.l;
/** /**
* kafka * kafka
* @author * @author
@ -29,16 +26,18 @@ public class MessageConsumer implements ApplicationRunner {
@Autowired @Autowired
public KafkaConsumer consumer; public KafkaConsumer consumer;
@Autowired
private EventPublisher eventPublisher; private EventPublisher eventPublisher;
private final String topic="four_car"; private final String topic="kafka-topic";
@Autowired
private IncidentService incidentService;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
List<String> list = Collections.singletonList(topic); List<String> list = Collections.singletonList(topic);
consumer.subscribe(list); consumer.subscribe(list);
while (true){ while (true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(100)); ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> { consumerRecords.forEach(record -> {
@ -46,6 +45,12 @@ public class MessageConsumer implements ApplicationRunner {
JSONObject jsonObject = JSONObject.parseObject(value); JSONObject jsonObject = JSONObject.parseObject(value);
log.info("value:{}",value); log.info("value:{}",value);
eventPublisher.publishEvent(jsonObject); eventPublisher.publishEvent(jsonObject);
try {
incidentService.warnEventProcess(jsonObject);
} catch (Exception e) {
throw new RuntimeException(e);
}
}); });
} }
} }

View File

@ -1,13 +1,9 @@
package com.muyu.event.listener; package com.muyu.event.listener;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.basic.EventCustom; import com.muyu.event.basic.EventCustom;
import com.muyu.event.basic.EventListener; import com.muyu.event.basic.EventListener;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* *
* @program: cloud-server * @program: cloud-server
@ -29,9 +25,6 @@ public class AddDatabaseListener implements EventListener {
values.add((String) value); values.add((String) value);
}); });
} }
@Override @Override

View File

@ -12,5 +12,6 @@ public interface IncidentService {
void warnEventProcess(JSONObject jsonObject) throws Exception; void warnEventProcess(JSONObject jsonObject) throws Exception;
void eventAlarmProcessing(JSONObject jsonObject) throws Exception;
} }

View File

@ -1,11 +1,9 @@
package com.muyu.event.service.impl; package com.muyu.event.service.impl;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.cache.MessageTemplateTypeCacheService; import com.muyu.cache.*;
import com.muyu.cache.SysCarCacheService;
import com.muyu.cache.WarnRuleCacheService;
import com.muyu.cache.WarnStrategyCacheService;
import com.muyu.common.domain.MessageTemplateType; import com.muyu.common.domain.MessageTemplateType;
import com.muyu.common.domain.SysCar; import com.muyu.common.domain.SysCar;
import com.muyu.common.domain.SysCarFault;
import com.muyu.common.domain.resp.SysCarVo; import com.muyu.common.domain.resp.SysCarVo;
import com.muyu.common.domain.resp.WarnRuleResp; import com.muyu.common.domain.resp.WarnRuleResp;
import com.muyu.common.domain.resp.WarnStrategyResp; import com.muyu.common.domain.resp.WarnStrategyResp;
@ -38,6 +36,7 @@ import java.util.stream.Collectors;
@Log4j2 @Log4j2
@Service @Service
public class IncidentServiceImpl implements IncidentService { public class IncidentServiceImpl implements IncidentService {
private static int DURATION_SECONDS = 5; private static int DURATION_SECONDS = 5;
private static List<JSONObject> receivedStrings = new ArrayList<>(); private static List<JSONObject> receivedStrings = new ArrayList<>();
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@ -61,27 +60,23 @@ public class IncidentServiceImpl implements IncidentService {
//报文模版 //报文模版
@Resource @Resource
private MessageTemplateTypeCacheService messageTemplateTypeCacheService; private MessageTemplateTypeCacheService messageTemplateTypeCacheService;
//kafka的主题名称
private final String topic="four_car"; private final String topic="kafka-topic";
@Autowired @Autowired
private MessageConsumer messageConsumer; private SysCarFaultCacheService sysCarFaultCacheService;
@Autowired
public KafkaConsumer consumer;
@Override @Override
public void warnEventProcess(JSONObject jsonObject) throws Exception { public void warnEventProcess(JSONObject jsonObject) throws Exception {
receivedStrings.add(jsonObject); receivedStrings.add(jsonObject);
//协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings //协议解析:每秒穿过来一个JSONObject jsonObject; 添加进receivedStrings
//根据这个车辆VIN查询出他对应的车辆类型 //根据这个车辆VIN查询出他对应的车辆类型
String carVin=null; String carVin=null;
//报文模版的ID //报文模版的ID
Integer templateId=null; Integer templateId=null;
//取出这辆车的carVin
for (JSONObject receivedString : receivedStrings) { for (JSONObject receivedString : receivedStrings) {
carVin = (String) receivedString.get("carVin"); carVin = (String) receivedString.get("carVin");
} }
//这辆车的信息
SysCar carByVin = null; SysCar carByVin = null;
List<SysCarVo> carVoList = sysCarCacheService.get(sysCarCacheService.keyPre()); List<SysCarVo> carVoList = sysCarCacheService.get(sysCarCacheService.keyPre());
Map<String, SysCarVo> carMap = carVoList.stream() Map<String, SysCarVo> carMap = carVoList.stream()
@ -94,6 +89,7 @@ public class IncidentServiceImpl implements IncidentService {
List<WarnStrategyResp> carWithWarnStrategyList=null; List<WarnStrategyResp> carWithWarnStrategyList=null;
List<WarnStrategyResp> warnStrategyResps = warnStrategyCacheService.get(warnStrategyCacheService.keyPre()); List<WarnStrategyResp> warnStrategyResps = warnStrategyCacheService.get(warnStrategyCacheService.keyPre());
for (WarnStrategyResp warnStrategyResp : warnStrategyResps) { for (WarnStrategyResp warnStrategyResp : warnStrategyResps) {
//策略中有绑定的车辆ID
if(warnStrategyResp.getCarTypeId()==carByVin.getCarTypeId()){ if(warnStrategyResp.getCarTypeId()==carByVin.getCarTypeId()){
carWithWarnStrategyList.add(warnStrategyResp); carWithWarnStrategyList.add(warnStrategyResp);
} }
@ -154,7 +150,29 @@ public class IncidentServiceImpl implements IncidentService {
} }
} }
} }
@Override
public void eventAlarmProcessing(JSONObject jsonObject) throws Exception {
String carVin = (String) jsonObject.get("carVin");
//车辆缓存配置
List<SysCarVo> carVoList = sysCarCacheService.get(sysCarCacheService.keyPre());
//这辆车的信息
SysCar carByVin = null;
Map<String, SysCarVo> carMap = carVoList.stream()
.collect(Collectors.toMap(SysCarVo::getCarVin, Function.identity()));
//获取到了这个车辆的信息
carByVin = carMap.get(carVin);
Long carTypeId = carByVin.getCarTypeId();
//车辆绑定的报警规则
List<SysCarFault> sysCarFaults = sysCarFaultCacheService.get(sysCarFaultCacheService.keyPre());
//车辆的所有报警规则都重新存储在sysCarFaultList里面
List<SysCarFault> sysCarFaultList=null;
SysCar finalCarByVin = carByVin;
sysCarFaults.forEach(sysCarFault -> {
if(sysCarFault.getCarTypeId().equals(carTypeId)){
sysCarFaultList.add(sysCarFault);
}
});
//报文模版
} }
}

View File

@ -160,7 +160,6 @@ public class MqttConfigure {
log.info("发送kafka成功"); log.info("发送kafka成功");
return jsonObject; return jsonObject;
} }
//kafka发送消息 //kafka发送消息
public void sendKafka(JSONObject jsonObject){ public void sendKafka(JSONObject jsonObject){
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString()); ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("four_car", jsonObject.toString());

View File

@ -1,5 +1,4 @@
package com.muyu.common.domain; package com.muyu.common.domain;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
@ -8,7 +7,6 @@ import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
/** /**
* *
* @author sx * @author sx
@ -22,6 +20,7 @@ import lombok.NoArgsConstructor;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@TableName(value = "sys_car",autoResultMap = true) @TableName(value = "sys_car",autoResultMap = true)
public class SysCar extends BaseEntity { public class SysCar extends BaseEntity {
@TableId(value = "id",type = IdType.AUTO) @TableId(value = "id",type = IdType.AUTO)
private Long id; private Long id;
private String carVin; private String carVin;