feat():指标预警事件完成,预警报警功能验证

master
Saisai Liu 2024-07-01 08:35:44 +08:00
parent e040751cbe
commit c5cd7bb4e9
10 changed files with 184 additions and 228 deletions

View File

@ -0,0 +1,30 @@
package com.mobai.resp;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author Saisai
* @className RespResult
* @description
* @date 2024/6/29 12:07
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RespResult {
private String vin;
private List<Double> resList;
private String keyCode;
private Long startTime;
private Long endTime;
}

View File

@ -2,8 +2,6 @@ package com.mobai;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.ComponentScan;
/** /**
* @author Mobai * @author Mobai

View File

@ -82,8 +82,6 @@ public class IotDBSessionConfig {
public void insertRecord(String deviceId, Long time,List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { public void insertRecord(String deviceId, Long time,List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) { if (measurementsList.size() == valuesList.size()) {
session.insertRecord(deviceId, time, measurementsList, valuesList); session.insertRecord(deviceId, time, measurementsList, valuesList);
log.info("键::{}",measurementsList);
log.info("值::{}",valuesList);
} else { } else {
log.error("measurementsList 与 valuesList 值不对应"); log.error("measurementsList 与 valuesList 值不对应");
} }

View File

@ -59,8 +59,8 @@ public class IotDbServerImpl implements IotDbServer {
if (null != req.getVin()) { if (null != req.getVin()) {
String sql = "select * from " + "root.vin.map." + req.getVin(); String sql = "select * from " + "root.vin.map." + req.getVin();
if (req.getCode() == null) { if (req.getCode() != null) {
sql = sql.replace("*", req.getCode()); sql = sql.replace("*", req.getCode()); //1719641179000
} }
// 开始时间 // 开始时间
if (req.getStartTime() != null && req.getStartTime() != 0) { if (req.getStartTime() != null && req.getStartTime() != 0) {

View File

@ -12,11 +12,14 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.math.BigDecimal; import java.math.BigDecimal;
@ -38,17 +41,9 @@ import java.util.Map;
@Slf4j @Slf4j
public class KafkaConsumerListenerExample { public class KafkaConsumerListenerExample {
@Autowired
private RedisService redisService;
@Autowired @Autowired
private IotDbServer iotDbServer; private IotDbServer iotDbServer;
// @Autowired
// private KafkaConsumer<String, String> consumer;
@Autowired @Autowired
private ForestGet forestGet; private ForestGet forestGet;
@ -58,15 +53,14 @@ public class KafkaConsumerListenerExample {
* @param record * @param record
*/ */
@KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics") @KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics")
public void consume(ConsumerRecord<String, String> record) { public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info("消费信息为:{}",record); log.info("消费信息为:{}",record);
// 无数据接口存map集合 // 无数据接口存map集合
Map<String,String> map = JSON.parseObject(record.value(),Map.class); Map<String,String> map = JSON.parseObject(record.value(),Map.class);
// 存入iotDB // 存入iotDB
try { try {
iotDbServer.insertData(map); iotDbServer.insertData(map);
log.info("添加成功"); acknowledgment.acknowledge();
} catch (StatementExecutionException e) { } catch (StatementExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (ServerException e) { } catch (ServerException e) {

View File

@ -9,9 +9,7 @@ import com.mobai.utils.RedisService;
import com.mobai.vehicle.event.service.EventsService; import com.mobai.vehicle.event.service.EventsService;
import com.mobai.vehicle.HandlerHelper; import com.mobai.vehicle.HandlerHelper;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@ -95,11 +93,25 @@ public class VinConsumerRunner implements ApplicationRunner {
Map<String,String> map = JSON.parseObject(record.value(), Map.class); Map<String,String> map = JSON.parseObject(record.value(), Map.class);
// 获取对应的事件 // 获取对应的事件
VehicleEvent events = eventsService.getEvents(map.get("vin")); VehicleEvent events = eventsService.getEvents(map.get("vin"));
log.info("执行事件:{}",events); // log.info("执行事件:{}",events);
if (events==null)
return;
HandlerHelper.doHandler(events, map, redisService); HandlerHelper.doHandler(events, map, redisService);
} }
} }
}).start(); }).start();
// 提交偏移量
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e)
{
if (e != null) {
System.out.println(offsets.toString());
System.out.println(e.toString());
}
}
});
} else { } else {
Thread.sleep(10); Thread.sleep(10);
} }

View File

@ -31,32 +31,12 @@ public class BugMalfunctionFactory implements EventActive {
@Autowired @Autowired
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
@Autowired
private RedisService redisService;
@Autowired @Autowired
private RedisTemplate<String,String> redisTemplate; private RedisTemplate<String,String> redisTemplate;
@Resource(name = "createFiveCaffeine") @Resource(name = "createFiveCaffeine")
private Cache cacheFive; private Cache cacheFive;
// @Autowired
// private Cache getCacheFive(@Qualifier("createFiveCaffeine") Cache cacheFive){
// if (cacheFive==null){
// cacheFive = Caffeine.newBuilder()
// // 设置初始连接数
// .initialCapacity(1000)
// //5秒没有读写自动删除
// .expireAfterAccess(5, TimeUnit.SECONDS)
// //最大容量1024个超过会自动清理空间
// .maximumSize(1024)
// .removalListener(((key, value, cause) -> {
// //清理通知 key,value ==> 键值对 cause ==> 清理原因
// }))
// .build();
// }
// return this.cacheFive = cacheFive;
// }
private BugMalfunctionFactory() { private BugMalfunctionFactory() {
} }
@ -69,72 +49,72 @@ public class BugMalfunctionFactory implements EventActive {
public void activeEvent(Map<String,String> vehicle, RedisService redisService) { public void activeEvent(Map<String,String> vehicle, RedisService redisService) {
BugMalfunctionResult result = new BugMalfunctionResult(); BugMalfunctionResult result = new BugMalfunctionResult();
result.setVin(vehicle.get("vin")); result.setVin(vehicle.get("vin"));
result.setTimestamp(Long.valueOf(vehicle.get("startTime"))); result.setTimestamp(Long.valueOf(vehicle.get("drivingRoute")));
if (vehicle.get("vehicleStatus").equals(0+"")) { if (vehicle.get("vehicleStatus")!=null&&vehicle.get("vehicleStatus").equals(0+"")) {
result.setFaultCode(FaultCode.VEHICLE.getString()); result.setFaultCode(FaultCode.VEHICLE.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("chargingStatus").equals(0+"")) { if (vehicle.get("chargingStatus")!=null&&vehicle.get("chargingStatus").equals(0+"")) {
result.setFaultCode(FaultCode.CHARGING.getString()); result.setFaultCode(FaultCode.CHARGING.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("operatingStatus").equals(0+"")) { if (vehicle.get("operatingStatus")!=null&&vehicle.get("operatingStatus").equals(0+"")) {
result.setFaultCode(FaultCode.OPERATING.getString()); result.setFaultCode(FaultCode.OPERATING.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("socStatus").equals(""+0)) { if (vehicle.get("socStatus")!=null&&vehicle.get("socStatus").equals(""+0)) {
result.setFaultCode(FaultCode.SOC.getString()); result.setFaultCode(FaultCode.SOC.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("chargingEnergyStorageStatus").equals(0+"")) { if (vehicle.get("chargingEnergyStorageStatus")!=null&&vehicle.get("chargingEnergyStorageStatus").equals(0+"")) {
result.setFaultCode(FaultCode.CHARGING_ENERGY.getString()); result.setFaultCode(FaultCode.CHARGING_ENERGY.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("driveMotorStatus").equals(0+"")) { if (vehicle.get("driveMotorStatus")!=null&&vehicle.get("driveMotorStatus").equals(0+"")) {
result.setFaultCode(FaultCode.DRIVE_MOTOR.getString()); result.setFaultCode(FaultCode.DRIVE_MOTOR.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("positionStatus").equals(0+"")) { if (vehicle.get("positionStatus")!=null&&vehicle.get("positionStatus").equals(0+"")) {
result.setFaultCode(FaultCode.POSITION.getString()); result.setFaultCode(FaultCode.POSITION.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("easStatus").equals(0+"")) { if (vehicle.get("easStatus")!=null&&vehicle.get("easStatus").equals(0+"")) {
result.setFaultCode(FaultCode.EAS.getString()); result.setFaultCode(FaultCode.EAS.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("ptcStatus").equals(0+"")) { if (vehicle.get("ptcStatus")!=null&&vehicle.get("ptcStatus").equals(0+"")) {
result.setFaultCode(FaultCode.PTC.getString()); result.setFaultCode(FaultCode.PTC.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("epsStatus").equals(0+"")) { if (vehicle.get("epsStatus")!=null&&vehicle.get("epsStatus").equals(0+"")) {
result.setFaultCode(FaultCode.EPS.getString()); result.setFaultCode(FaultCode.EPS.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("absStatus").equals(0+"")) { if (vehicle.get("absStatus")!=null&&vehicle.get("absStatus").equals(0+"")) {
result.setFaultCode(FaultCode.ABS.getString()); result.setFaultCode(FaultCode.ABS.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("mcuStatus").equals(0+"")) { if (vehicle.get("mcuStatus")!=null&&vehicle.get("mcuStatus").equals(0+"")) {
result.setFaultCode(FaultCode.MCU.getString()); result.setFaultCode(FaultCode.MCU.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("heatingStatus").equals(0+"")) { if (vehicle.get("heatingStatus")!=null&&vehicle.get("heatingStatus").equals(0+"")) {
result.setFaultCode(FaultCode.HEATING.getString()); result.setFaultCode(FaultCode.HEATING.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("batteryStatus").equals(0+"")) { if (vehicle.get("batteryStatus")!=null&&vehicle.get("batteryStatus").equals(0+"")) {
result.setFaultCode(FaultCode.BATTERY.getString()); result.setFaultCode(FaultCode.BATTERY.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("batteryInsulationStatus").equals(0+"")) { if (vehicle.get("batteryInsulationStatus")!=null&&vehicle.get("batteryInsulationStatus").equals(0+"")) {
result.setFaultCode(FaultCode.BATTERY_INSULATION.getString()); result.setFaultCode(FaultCode.BATTERY_INSULATION.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("dcdcStatus").equals(0+"")) { if (vehicle.get("dcdcStatus")!=null&&vehicle.get("dcdcStatus").equals(0+"")) {
result.setFaultCode(FaultCode.DCDC.getString()); result.setFaultCode(FaultCode.DCDC.getString());
this.active(result); this.active(result);
} }
if (vehicle.get("chgStatus").equals(0+"")) { if (vehicle.get("chgStatus")!=null&&vehicle.get("chgStatus").equals(0+"")) {
result.setFaultCode(FaultCode.CHG.getString()); result.setFaultCode(FaultCode.CHG.getString());
this.active(result); this.active(result);
} }

View File

@ -5,7 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.mobai.iotDB.service.impl.IotDbServerImpl; import com.mobai.iotDB.service.impl.IotDbServerImpl;
import com.mobai.req.VehicleReq; import com.mobai.req.VehicleReq;
import com.mobai.utils.RedisService; import com.mobai.resp.RespResult;
import lombok.Data; import lombok.Data;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.ExchangeTypes;
@ -19,13 +19,13 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep; import static java.util.Arrays.stream;
/** /**
* *
@ -61,30 +61,32 @@ public class StandardWarningFactory {
private Map<String, ScheduledExecutorService> scheduleMap; private Map<String, ScheduledExecutorService> scheduleMap;
/** /**
* keycode-cycleLength-level * keycode-opportunity-level
*/ */
private ConcurrentHashMap<String, List<String>> vinLevel; private ConcurrentHashMap<String, List<String>> vinLevel;
private List<String> vins; private static List<String> vins;
static {
vins = new ArrayList<String>();
}
@RabbitListener(bindings = @QueueBinding( // @RabbitListener(bindings = @QueueBinding(
value = @Queue, // value = @Queue,
exchange = @Exchange(name = "vehicle-prop", type = ExchangeTypes.FANOUT) // exchange = @Exchange(name = "vehicle-prop", type = ExchangeTypes.FANOUT)
)) // ))
@RabbitListener(queues = "zhiLian-vehicle-index")
public void changeProp(String vin) { public void changeProp(String vin) {
vin= vin.replaceAll("\"","");
log.info("更新指标缓存");
List<String> attrs = new ArrayList<>(); List<String> attrs = new ArrayList<>();
List<Integer> attrLength = new ArrayList<>(); List<String> propList = redisTemplate.opsForList().range(vin + "Index", 0, -1);
List<String> propList = redisTemplate.opsForList().range(vin + "List", 0, -1);
for (String string : propList) { for (String string : propList) {
JSONObject prop = JSON.parseObject(string); JSONObject prop = JSON.parseObject(string);
// 指标属性
attrs.add(String.valueOf(prop.get("keyCode")));
// 周期延迟时长 // 周期延迟时长
attrLength.add((Integer) prop.get("times")); attrs.add(prop.get("keyCode") + "-" + prop.get("opportunity") + "-" + prop.get("attributeValue"));
attrs.add(prop.get("keyCode") + "-" + prop.get("level") + "-" + prop.get("times"));
} }
cache.put(vin + "-keyCode", JSON.toJSONString(attrs)); // vin keycode-level-times cache.put(vin + "-keyCode", JSON.toJSONString(attrs)); // vin keycode-level-attributeValue
log.info("更新缓存:{}", vin); log.info("更新缓存:{}", vin);
// 指标未编写和存入 // 指标未编写和存入
} }
@ -99,57 +101,99 @@ public class StandardWarningFactory {
public void joinStandardVehicle(String vin) { public void joinStandardVehicle(String vin) {
List<String> cacheProps = new ArrayList<>(); List<String> cacheProps = new ArrayList<>();
log.info("车辆上线:{}", vin); log.info("车辆上线:{}", vin);
List<String> props = redisTemplate.opsForList().range(vin + "Prop", 0, -1); List<String> props = redisTemplate.opsForList().range(vin + "Index", 0, -1);
redisTemplate.expire(vin + "Index", 10, TimeUnit.MINUTES);
props.forEach(prop -> { props.forEach(prop -> {
log.info("缓存存入操作");
JSONObject jsonObject = JSON.parseObject(prop); JSONObject jsonObject = JSON.parseObject(prop);
String keyCode = String.valueOf(jsonObject.get("keyCode")); String keyCode = String.valueOf(jsonObject.get("keyCode"));
String level = String.valueOf(jsonObject.get("level")); String level = String.valueOf(jsonObject.get("attributeValue"));
String windowLength = String.valueOf(jsonObject.get("windowLength")); String opportunity = String.valueOf(jsonObject.get("opportunity"));
String cycleLength = String.valueOf(jsonObject.get("activeLength")); cacheProps.add(keyCode + "-" + opportunity + "-" + level);
// cycle = windowLEngth * cycle
cacheProps.add(keyCode + "-" + cycleLength + "-" + level);
}); });
// 存入车辆指标缓存 code-cycleLength-level new Thread(() -> {
cache.put(vin + "-keyCode", JSON.toJSONString(props)); cache.put(vin + "-keyCode", JSON.toJSONString(cacheProps));
vins.add(vin); vins.add(vin);
List<String> vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); this.doActive(vin);
if (vinLevel == null) { }).start();
// 存入车辆指标缓存 code-opportunity-level
}
private void doActive(String vin) {
// 每次都执行
List<String> strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class);
if (strings == null || strings.size() == 0) {
changeProp(vin); changeProp(vin);
vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); // keycode-level-times
} }
Map<String, String> map = new HashMap<>(); strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class);
vinLevel.stream().map(str -> (map.put(str.split("-")[0], str.split("-")[1]))); // key : attr value : level Timer timer = new Timer();
String[] split = vinLevel.get(0).split("-"); // vin-level-times if (vins == null) {
String times = split[2]; vins = new ArrayList<>();
try { }
List<Map<String, String>> vehicleList = iotDbServerImpl.queryDataFromIotDb( List<String> finalStrings = strings;
new VehicleReq() {{ TimerTask timerTask = new TimerTask() {
setVin(vin); @Override
setStartTime(new Date().getTime() - Long.parseLong(times) * 1000); public void run() {
setEndTime(new Date().getTime()); new Thread(() -> {
}}); int first = 0;
List<String> finalVinLevel = vinLevel; for (String string : finalStrings) {
HashMap<String, List<Integer>> result = new HashMap<>(); avtiveThread(vin, string);
// 数据查询结果处理 if (first==0) doActive(vin);
vehicleList.forEach(vehicleMap -> { first = 1;
for (String vinInfo : finalVinLevel) {
String[] vinInfoSplit = vinInfo.split("-");
// 数据库查询结果
String string = vehicleMap.get(vinInfoSplit[0]);
int resu = Integer.parseInt(vinInfoSplit[1]) - Integer.parseInt(string);
if (!result.containsKey(string)) {
result.put(string, new ArrayList<Integer>());
}
result.get(string).add(resu);
// 指标异常处理
if (resu > 10 || resu < -10) {
log.info("指标有问题预警发生ing");
// rabbitTemplate.convertAndSend("standardWarning-error", vinInfoSplit[0] + ":" + string);
}
} }
}); }).start();
log.info("发送了结果"); }
// rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result)); };
timer.schedule(timerTask, Long.parseLong(strings.get(0).split("-")[1]) * 1000);
if (!vins.contains(vin)) {
timer.cancel();
return;
}
}
private void avtiveThread(String vin, String string) {
String[] split = string.split("-");
try {
List<String> list = new ArrayList<>();
List<Map<String, String>> mapList = iotDbServerImpl.queryDataFromIotDb(new VehicleReq() {{
setVin(vin);
setCode(split[0]);
setEndTime(new Date().getTime());
setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000);
}});
// mapList.stream().map(map -> list.addAll(map.values()));
for (Map<String, String> stringMap : mapList) {
list.addAll(stringMap.values());
}
if (list.size() == 0) {
return;
}
List<Double> collect = new ArrayList<>();
for (String s : list) {
BigDecimal bigDecimal = new BigDecimal(s);
collect.add(bigDecimal.doubleValue());
}
Double avg = (Double) collect.stream().mapToDouble(number -> number).average().getAsDouble();
List<Double> result = new ArrayList<>();
for (Double i : collect) {
result.add(i - avg);
if (i > Integer.parseInt(split[2])) {
rabbitTemplate.convertAndSend("standardWarning-error", vin+"-"+split[0] + ":" + i);
}
}
RespResult resp = new RespResult() {{
setKeyCode(split[0]);
setVin(vin);
setResList(result);
setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000);
setStartTime(new Date().getTime());
}};
System.out.println("当前指标为:"+split[2]);
log.info("执行指标预警、滑窗结果滑窗结果:{}", result);
System.out.println(resp);
rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(resp));
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -165,115 +209,11 @@ public class StandardWarningFactory {
@RabbitListener(queues = "standard-Warn-Event-End") @RabbitListener(queues = "standard-Warn-Event-End")
public void outStandardVehicle(String vin) { public void outStandardVehicle(String vin) {
log.info("车辆下线:{}", vin); log.info("车辆下线:{}", vin);
if (vins != null) {
// scheduleMap.get(vin + "-schedule").shutdown(); if (vins.contains(vin))
} vins.remove("vin");
// 周期线程池
public void activeEvent(Map<String, String> vehicle, RedisService redisService) {
// 创建初始的周期性线程池
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
// 提交一些任务给线程池执行
for (String vin : vins) {// keycode-cycleLength-times
List<String> strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class);
if (strings.isEmpty()){
changeProp(vin);
}
executor.submit(() -> {
for (String string : strings) {
// keycode-cycleLength-level
String[] split = string.split("-");
try {
List<String> list = new ArrayList<>();
iotDbServerImpl.queryDataFromIotDb(new VehicleReq() {{
setVin(vin);
setCode(split[0]);
setEndTime(new Date().getTime());
setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000);
}}).stream().map(map -> list.addAll(map.values()));
List<Integer> collect = list.stream().map(Integer::valueOf).toList();
int avg = (int) collect.stream().mapToInt(number -> number).average().getAsDouble();
List<Integer> result = new ArrayList<>();
for (Integer i : collect) {
result.add(i - avg);
if (i>Integer.parseInt(split[2])){
rabbitTemplate.convertAndSend("standardWarning-error", split[0] + ":" + i);
}
}
rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
} }
// 提交线程池 // rabbitTemplate.convertAndSend();
// 只用延迟线程
// executor.scheduleWithFixedDelay();
} }
/**
* 线
*/
private static void way2() {
ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); // 参数--核线程个数
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println(Thread.currentThread().getName() + " → " + " Start Time = " + new Date());
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " → " + " End Time = " + new Date());
}, 1, 4, TimeUnit.SECONDS); // 延迟 1s周期 4s
}
// public void sche(String[] args) {
// ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//
// // 初始延迟和周期
// long initialDelay = 0;
// long period = 1000; // 初始周期为1秒
//
// // 提交一个周期性任务
// ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
// // 模拟任务执行逻辑
// System.out.println("Task execution...");
//
// // 模拟根据条件动态调整执行周期或取消任务
// if (someCondition()) {
// // 根据条件取消当前任务
// future.cancel(false);
// System.out.println("Task cancelled due to condition.");
//
// // 根据条件重新安排一个新任务
// long newDelay = 2000; // 新的延迟为2秒
// long newPeriod = 3000; // 新的周期为3秒
// future = executor.scheduleAtFixedRate(() -> {
// System.out.println("New task execution...");
// }, newDelay, newPeriod, TimeUnit.MILLISECONDS);
// }
// }, initialDelay, period, TimeUnit.MILLISECONDS);
//
// // 等待一段时间后关闭 executor
// try {
// Thread.sleep(15000); // 等待15秒钟演示多次执行和动态调整
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// executor.shutdown();
// }
//
// // 示例条件方法,根据具体情况自行替换
// private static boolean someCondition() {
// // 这里可以根据具体逻辑返回是否满足条件
// return Math.random() < 0.1; // 模拟10%的概率满足条件
// }
} }

View File

@ -27,6 +27,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
public void onMessage(Message message, byte[] pattern) { public void onMessage(Message message, byte[] pattern) {
String key = message.toString(); String key = message.toString();
log.warn("过期的KEY是: {}" , key); log.warn("过期的KEY是: {}" , key);
if (!key.contains("-")){log.info("不是故障事件");return;}
BugMalfunctionResult result = new BugMalfunctionResult(); BugMalfunctionResult result = new BugMalfunctionResult();
String[] split = key.split("-"); String[] split = key.split("-");
result.setVin(split[0]); result.setVin(split[0]);

View File

@ -55,13 +55,16 @@ public class EventsActiveServiceImpl extends ServiceImpl<VehicleEventMapper, Veh
Object vehicleEvent = cacheHalf.getIfPresent(vin + "-events"); Object vehicleEvent = cacheHalf.getIfPresent(vin + "-events");
if (vehicleEvent != null) { if (vehicleEvent != null) {
// 若有访问更新缓存时间 // 若有访问更新缓存时间
log.info("缓存");
cacheHalf.put(vin + "-events", vehicleEvent); cacheHalf.put(vin + "-events", vehicleEvent);
return (VehicleEvent) vehicleEvent; return (VehicleEvent) vehicleEvent;
} }
vehicleEvent = this.getOne(new LambdaQueryWrapper<>() {{ vehicleEvent = this.getOne(new LambdaQueryWrapper<>() {{
eq(VehicleEvent::getVin, vin); eq(VehicleEvent::getVin, vin);
}}); }});
if (vehicleEvent==null){
log.warn("该车辆没有其他事件");
return null;
}
cacheHalf.put(vin + "-events", vehicleEvent); cacheHalf.put(vin + "-events", vehicleEvent);
log.info("数据库"); log.info("数据库");
Object ifPresent = cacheHalf.getIfPresent(vin + "-events"); Object ifPresent = cacheHalf.getIfPresent(vin + "-events");