diff --git a/mobai-event-common/src/main/java/com/mobai/resp/RespResult.java b/mobai-event-common/src/main/java/com/mobai/resp/RespResult.java new file mode 100644 index 0000000..35187ff --- /dev/null +++ b/mobai-event-common/src/main/java/com/mobai/resp/RespResult.java @@ -0,0 +1,30 @@ +package com.mobai.resp; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author Saisai + * @className RespResult + * @description 描述 + * @date 2024/6/29 12:07 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RespResult { + private String vin; + + private List resList; + + private String keyCode; + + private Long startTime; + + private Long endTime; +} diff --git a/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java b/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java index 3b63082..3f60df1 100644 --- a/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java +++ b/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java @@ -2,8 +2,6 @@ package com.mobai; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cache.annotation.EnableCaching; -import org.springframework.context.annotation.ComponentScan; /** * @author Mobai diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java index 925bd8a..a443e2e 100644 --- a/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java +++ b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java @@ -82,8 +82,6 @@ public class IotDBSessionConfig { public void insertRecord(String deviceId, Long time,List measurementsList, List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { if (measurementsList.size() == valuesList.size()) { session.insertRecord(deviceId, time, measurementsList, valuesList); - log.info("键::{}",measurementsList); - log.info("值::{}",valuesList); } else { log.error("measurementsList 与 valuesList 值不对应"); } diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java index 02b0927..07927f0 100644 --- a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java +++ b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java @@ -59,8 +59,8 @@ public class IotDbServerImpl implements IotDbServer { if (null != req.getVin()) { String sql = "select * from " + "root.vin.map." + req.getVin(); - if (req.getCode() == null) { - sql = sql.replace("*", req.getCode()); + if (req.getCode() != null) { + sql = sql.replace("*", req.getCode()); //1719641179000 } // 开始时间 if (req.getStartTime() != null && req.getStartTime() != 0) { diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java index 5d041dd..419de8c 100644 --- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java +++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java @@ -12,11 +12,14 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.math.BigDecimal; @@ -38,17 +41,9 @@ import java.util.Map; @Slf4j public class KafkaConsumerListenerExample { - - @Autowired - private RedisService redisService; - - @Autowired private IotDbServer iotDbServer; -// @Autowired -// private KafkaConsumer consumer; - @Autowired private ForestGet forestGet; @@ -58,15 +53,14 @@ public class KafkaConsumerListenerExample { * @param record */ @KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics") - public void consume(ConsumerRecord record) { + public void consume(ConsumerRecord record, Acknowledgment acknowledgment) { log.info("消费信息为:{}",record); // 无数据接口,存map集合 Map map = JSON.parseObject(record.value(),Map.class); // 存入iotDB try { iotDbServer.insertData(map); - log.info("添加成功"); - + acknowledgment.acknowledge(); } catch (StatementExecutionException e) { throw new RuntimeException(e); } catch (ServerException e) { diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java index 86eae20..10b2fb2 100644 --- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java +++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java @@ -9,9 +9,7 @@ import com.mobai.utils.RedisService; import com.mobai.vehicle.event.service.EventsService; import com.mobai.vehicle.HandlerHelper; import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; @@ -95,11 +93,25 @@ public class VinConsumerRunner implements ApplicationRunner { Map map = JSON.parseObject(record.value(), Map.class); // 获取对应的事件 VehicleEvent events = eventsService.getEvents(map.get("vin")); - log.info("执行事件:{}",events); +// log.info("执行事件:{}",events); + if (events==null) + return; HandlerHelper.doHandler(events, map, redisService); } } }).start(); + // 提交偏移量 + consumer.commitAsync(new OffsetCommitCallback() { + + @Override + public void onComplete(Map offsets, Exception e) + { + if (e != null) { + System.out.println(offsets.toString()); + System.out.println(e.toString()); + } + } + }); } else { Thread.sleep(10); } diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java index 19ab931..d3ee75f 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java @@ -31,32 +31,12 @@ public class BugMalfunctionFactory implements EventActive { @Autowired private RabbitTemplate rabbitTemplate; - @Autowired - private RedisService redisService; - @Autowired private RedisTemplate redisTemplate; @Resource(name = "createFiveCaffeine") private Cache cacheFive; -// @Autowired -// private Cache getCacheFive(@Qualifier("createFiveCaffeine") Cache cacheFive){ -// if (cacheFive==null){ -// cacheFive = Caffeine.newBuilder() -// // 设置初始连接数 -// .initialCapacity(1000) -// //5秒没有读写自动删除 -// .expireAfterAccess(5, TimeUnit.SECONDS) -// //最大容量1024个,超过会自动清理空间 -// .maximumSize(1024) -// .removalListener(((key, value, cause) -> { -// //清理通知 key,value ==> 键值对 cause ==> 清理原因 -// })) -// .build(); -// } -// return this.cacheFive = cacheFive; -// } private BugMalfunctionFactory() { } @@ -69,72 +49,72 @@ public class BugMalfunctionFactory implements EventActive { public void activeEvent(Map vehicle, RedisService redisService) { BugMalfunctionResult result = new BugMalfunctionResult(); result.setVin(vehicle.get("vin")); - result.setTimestamp(Long.valueOf(vehicle.get("startTime"))); - if (vehicle.get("vehicleStatus").equals(0+"")) { + result.setTimestamp(Long.valueOf(vehicle.get("drivingRoute"))); + if (vehicle.get("vehicleStatus")!=null&&vehicle.get("vehicleStatus").equals(0+"")) { result.setFaultCode(FaultCode.VEHICLE.getString()); this.active(result); } - if (vehicle.get("chargingStatus").equals(0+"")) { + if (vehicle.get("chargingStatus")!=null&&vehicle.get("chargingStatus").equals(0+"")) { result.setFaultCode(FaultCode.CHARGING.getString()); this.active(result); } - if (vehicle.get("operatingStatus").equals(0+"")) { + if (vehicle.get("operatingStatus")!=null&&vehicle.get("operatingStatus").equals(0+"")) { result.setFaultCode(FaultCode.OPERATING.getString()); this.active(result); } - if (vehicle.get("socStatus").equals(""+0)) { + if (vehicle.get("socStatus")!=null&&vehicle.get("socStatus").equals(""+0)) { result.setFaultCode(FaultCode.SOC.getString()); this.active(result); } - if (vehicle.get("chargingEnergyStorageStatus").equals(0+"")) { + if (vehicle.get("chargingEnergyStorageStatus")!=null&&vehicle.get("chargingEnergyStorageStatus").equals(0+"")) { result.setFaultCode(FaultCode.CHARGING_ENERGY.getString()); this.active(result); } - if (vehicle.get("driveMotorStatus").equals(0+"")) { + if (vehicle.get("driveMotorStatus")!=null&&vehicle.get("driveMotorStatus").equals(0+"")) { result.setFaultCode(FaultCode.DRIVE_MOTOR.getString()); this.active(result); } - if (vehicle.get("positionStatus").equals(0+"")) { + if (vehicle.get("positionStatus")!=null&&vehicle.get("positionStatus").equals(0+"")) { result.setFaultCode(FaultCode.POSITION.getString()); this.active(result); } - if (vehicle.get("easStatus").equals(0+"")) { + if (vehicle.get("easStatus")!=null&&vehicle.get("easStatus").equals(0+"")) { result.setFaultCode(FaultCode.EAS.getString()); this.active(result); } - if (vehicle.get("ptcStatus").equals(0+"")) { + if (vehicle.get("ptcStatus")!=null&&vehicle.get("ptcStatus").equals(0+"")) { result.setFaultCode(FaultCode.PTC.getString()); this.active(result); } - if (vehicle.get("epsStatus").equals(0+"")) { + if (vehicle.get("epsStatus")!=null&&vehicle.get("epsStatus").equals(0+"")) { result.setFaultCode(FaultCode.EPS.getString()); this.active(result); } - if (vehicle.get("absStatus").equals(0+"")) { + if (vehicle.get("absStatus")!=null&&vehicle.get("absStatus").equals(0+"")) { result.setFaultCode(FaultCode.ABS.getString()); this.active(result); } - if (vehicle.get("mcuStatus").equals(0+"")) { + if (vehicle.get("mcuStatus")!=null&&vehicle.get("mcuStatus").equals(0+"")) { result.setFaultCode(FaultCode.MCU.getString()); this.active(result); } - if (vehicle.get("heatingStatus").equals(0+"")) { + if (vehicle.get("heatingStatus")!=null&&vehicle.get("heatingStatus").equals(0+"")) { result.setFaultCode(FaultCode.HEATING.getString()); this.active(result); } - if (vehicle.get("batteryStatus").equals(0+"")) { + if (vehicle.get("batteryStatus")!=null&&vehicle.get("batteryStatus").equals(0+"")) { result.setFaultCode(FaultCode.BATTERY.getString()); this.active(result); } - if (vehicle.get("batteryInsulationStatus").equals(0+"")) { + if (vehicle.get("batteryInsulationStatus")!=null&&vehicle.get("batteryInsulationStatus").equals(0+"")) { result.setFaultCode(FaultCode.BATTERY_INSULATION.getString()); this.active(result); } - if (vehicle.get("dcdcStatus").equals(0+"")) { + if (vehicle.get("dcdcStatus")!=null&&vehicle.get("dcdcStatus").equals(0+"")) { result.setFaultCode(FaultCode.DCDC.getString()); this.active(result); } - if (vehicle.get("chgStatus").equals(0+"")) { + if (vehicle.get("chgStatus")!=null&&vehicle.get("chgStatus").equals(0+"")) { result.setFaultCode(FaultCode.CHG.getString()); this.active(result); } diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/StandardWarningFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/StandardWarningFactory.java index 916f22c..05a58ef 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/StandardWarningFactory.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/StandardWarningFactory.java @@ -5,7 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.github.benmanes.caffeine.cache.Cache; import com.mobai.iotDB.service.impl.IotDbServerImpl; import com.mobai.req.VehicleReq; -import com.mobai.utils.RedisService; +import com.mobai.resp.RespResult; import lombok.Data; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.ExchangeTypes; @@ -19,13 +19,13 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.math.BigDecimal; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static java.lang.Thread.sleep; +import static java.util.Arrays.stream; /** * 指标预警工厂 @@ -61,30 +61,32 @@ public class StandardWarningFactory { private Map scheduleMap; /** - * keycode-cycleLength-level + * keycode-opportunity-level */ private ConcurrentHashMap> vinLevel; - private List vins; + private static List vins; + static { + vins = new ArrayList(); + } - @RabbitListener(bindings = @QueueBinding( - value = @Queue, - exchange = @Exchange(name = "vehicle-prop", type = ExchangeTypes.FANOUT) - )) +// @RabbitListener(bindings = @QueueBinding( +// value = @Queue, +// exchange = @Exchange(name = "vehicle-prop", type = ExchangeTypes.FANOUT) +// )) + @RabbitListener(queues = "zhiLian-vehicle-index") public void changeProp(String vin) { + vin= vin.replaceAll("\"",""); + log.info("更新指标缓存"); List attrs = new ArrayList<>(); - List attrLength = new ArrayList<>(); - List propList = redisTemplate.opsForList().range(vin + "List", 0, -1); + List propList = redisTemplate.opsForList().range(vin + "Index", 0, -1); for (String string : propList) { JSONObject prop = JSON.parseObject(string); - // 指标属性 - attrs.add(String.valueOf(prop.get("keyCode"))); // 周期延迟时长 - attrLength.add((Integer) prop.get("times")); - attrs.add(prop.get("keyCode") + "-" + prop.get("level") + "-" + prop.get("times")); + attrs.add(prop.get("keyCode") + "-" + prop.get("opportunity") + "-" + prop.get("attributeValue")); } - cache.put(vin + "-keyCode", JSON.toJSONString(attrs)); // vin keycode-level-times + cache.put(vin + "-keyCode", JSON.toJSONString(attrs)); // vin keycode-level-attributeValue log.info("更新缓存:{}", vin); // 指标未编写和存入 } @@ -99,57 +101,99 @@ public class StandardWarningFactory { public void joinStandardVehicle(String vin) { List cacheProps = new ArrayList<>(); log.info("车辆上线:{}", vin); - List props = redisTemplate.opsForList().range(vin + "Prop", 0, -1); + List props = redisTemplate.opsForList().range(vin + "Index", 0, -1); + redisTemplate.expire(vin + "Index", 10, TimeUnit.MINUTES); props.forEach(prop -> { + log.info("缓存存入操作"); JSONObject jsonObject = JSON.parseObject(prop); String keyCode = String.valueOf(jsonObject.get("keyCode")); - String level = String.valueOf(jsonObject.get("level")); - String windowLength = String.valueOf(jsonObject.get("windowLength")); - String cycleLength = String.valueOf(jsonObject.get("activeLength")); - // cycle = windowLEngth * cycle - cacheProps.add(keyCode + "-" + cycleLength + "-" + level); + String level = String.valueOf(jsonObject.get("attributeValue")); + String opportunity = String.valueOf(jsonObject.get("opportunity")); + cacheProps.add(keyCode + "-" + opportunity + "-" + level); }); - // 存入车辆指标缓存 code-cycleLength-level - cache.put(vin + "-keyCode", JSON.toJSONString(props)); - vins.add(vin); - List vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); - if (vinLevel == null) { + new Thread(() -> { + cache.put(vin + "-keyCode", JSON.toJSONString(cacheProps)); + vins.add(vin); + this.doActive(vin); + }).start(); + // 存入车辆指标缓存 code-opportunity-level + + } + + private void doActive(String vin) { + // 每次都执行 + List strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); + if (strings == null || strings.size() == 0) { changeProp(vin); - vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); // keycode-level-times } - Map map = new HashMap<>(); - vinLevel.stream().map(str -> (map.put(str.split("-")[0], str.split("-")[1]))); // key : attr value : level - String[] split = vinLevel.get(0).split("-"); // vin-level-times - String times = split[2]; - try { - List> vehicleList = iotDbServerImpl.queryDataFromIotDb( - new VehicleReq() {{ - setVin(vin); - setStartTime(new Date().getTime() - Long.parseLong(times) * 1000); - setEndTime(new Date().getTime()); - }}); - List finalVinLevel = vinLevel; - HashMap> result = new HashMap<>(); - // 数据查询结果处理 - vehicleList.forEach(vehicleMap -> { - for (String vinInfo : finalVinLevel) { - String[] vinInfoSplit = vinInfo.split("-"); - // 数据库查询结果 - String string = vehicleMap.get(vinInfoSplit[0]); - int resu = Integer.parseInt(vinInfoSplit[1]) - Integer.parseInt(string); - if (!result.containsKey(string)) { - result.put(string, new ArrayList()); - } - result.get(string).add(resu); - // 指标异常处理 - if (resu > 10 || resu < -10) { - log.info("指标有问题,预警发生ing"); -// rabbitTemplate.convertAndSend("standardWarning-error", vinInfoSplit[0] + ":" + string); - } + strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); + Timer timer = new Timer(); + if (vins == null) { + vins = new ArrayList<>(); + } + List finalStrings = strings; + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + new Thread(() -> { + int first = 0; + for (String string : finalStrings) { + avtiveThread(vin, string); + if (first==0) doActive(vin); + first = 1; } - }); - log.info("发送了结果"); -// rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result)); + }).start(); + } + }; + timer.schedule(timerTask, Long.parseLong(strings.get(0).split("-")[1]) * 1000); + + if (!vins.contains(vin)) { + timer.cancel(); + return; + } + } + + private void avtiveThread(String vin, String string) { + String[] split = string.split("-"); + try { + List list = new ArrayList<>(); + List> mapList = iotDbServerImpl.queryDataFromIotDb(new VehicleReq() {{ + setVin(vin); + setCode(split[0]); + setEndTime(new Date().getTime()); + setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000); + }}); +// mapList.stream().map(map -> list.addAll(map.values())); + for (Map stringMap : mapList) { + list.addAll(stringMap.values()); + } + if (list.size() == 0) { + return; + } + List collect = new ArrayList<>(); + for (String s : list) { + BigDecimal bigDecimal = new BigDecimal(s); + collect.add(bigDecimal.doubleValue()); + } + Double avg = (Double) collect.stream().mapToDouble(number -> number).average().getAsDouble(); + List result = new ArrayList<>(); + for (Double i : collect) { + result.add(i - avg); + if (i > Integer.parseInt(split[2])) { + rabbitTemplate.convertAndSend("standardWarning-error", vin+"-"+split[0] + ":" + i); + } + } + RespResult resp = new RespResult() {{ + setKeyCode(split[0]); + setVin(vin); + setResList(result); + setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000); + setStartTime(new Date().getTime()); + }}; + System.out.println("当前指标为:"+split[2]); + log.info("执行指标预警、滑窗结果滑窗结果:{}", result); + System.out.println(resp); + rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(resp)); } catch (Exception e) { throw new RuntimeException(e); } @@ -165,115 +209,11 @@ public class StandardWarningFactory { @RabbitListener(queues = "standard-Warn-Event-End") public void outStandardVehicle(String vin) { log.info("车辆下线:{}", vin); - -// scheduleMap.get(vin + "-schedule").shutdown(); - } - - - // 周期线程池 - public void activeEvent(Map vehicle, RedisService redisService) { - // 创建初始的周期性线程池 - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); - - // 提交一些任务给线程池执行 - for (String vin : vins) {// keycode-cycleLength-times - List strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); - if (strings.isEmpty()){ - changeProp(vin); - } - executor.submit(() -> { - for (String string : strings) { - // keycode-cycleLength-level - String[] split = string.split("-"); - try { - List list = new ArrayList<>(); - iotDbServerImpl.queryDataFromIotDb(new VehicleReq() {{ - setVin(vin); - setCode(split[0]); - setEndTime(new Date().getTime()); - setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000); - }}).stream().map(map -> list.addAll(map.values())); - List collect = list.stream().map(Integer::valueOf).toList(); - int avg = (int) collect.stream().mapToInt(number -> number).average().getAsDouble(); - List result = new ArrayList<>(); - for (Integer i : collect) { - result.add(i - avg); - if (i>Integer.parseInt(split[2])){ - rabbitTemplate.convertAndSend("standardWarning-error", split[0] + ":" + i); - } - } - rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); + if (vins != null) { + if (vins.contains(vin)) + vins.remove("vin"); } - // 提交线程池 - // 只用延迟线程 -// executor.scheduleWithFixedDelay(); - +// rabbitTemplate.convertAndSend(); } - - /** - * 周期型线程池执行方法 - */ - private static void way2() { - ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); // 参数--核线程个数 - - scheduledExecutorService.scheduleWithFixedDelay(() -> { - System.out.println(Thread.currentThread().getName() + " → " + " Start Time = " + new Date()); - try { - sleep(3000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - System.out.println(Thread.currentThread().getName() + " → " + " End Time = " + new Date()); - }, 1, 4, TimeUnit.SECONDS); // 延迟 1s,周期 4s - - } - -// public void sche(String[] args) { -// ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); -// -// // 初始延迟和周期 -// long initialDelay = 0; -// long period = 1000; // 初始周期为1秒 -// -// // 提交一个周期性任务 -// ScheduledFuture future = executor.scheduleAtFixedRate(() -> { -// // 模拟任务执行逻辑 -// System.out.println("Task execution..."); -// -// // 模拟根据条件动态调整执行周期或取消任务 -// if (someCondition()) { -// // 根据条件取消当前任务 -// future.cancel(false); -// System.out.println("Task cancelled due to condition."); -// -// // 根据条件重新安排一个新任务 -// long newDelay = 2000; // 新的延迟为2秒 -// long newPeriod = 3000; // 新的周期为3秒 -// future = executor.scheduleAtFixedRate(() -> { -// System.out.println("New task execution..."); -// }, newDelay, newPeriod, TimeUnit.MILLISECONDS); -// } -// }, initialDelay, period, TimeUnit.MILLISECONDS); -// -// // 等待一段时间后关闭 executor -// try { -// Thread.sleep(15000); // 等待15秒钟,演示多次执行和动态调整 -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// executor.shutdown(); -// } -// -// // 示例条件方法,根据具体情况自行替换 -// private static boolean someCondition() { -// // 这里可以根据具体逻辑返回是否满足条件 -// return Math.random() < 0.1; // 模拟10%的概率满足条件 -// } - } diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisKeyExpirationListener.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisKeyExpirationListener.java index d5b92b2..026c6bd 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisKeyExpirationListener.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisKeyExpirationListener.java @@ -27,6 +27,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene public void onMessage(Message message, byte[] pattern) { String key = message.toString(); log.warn("过期的KEY是: {}" , key); + if (!key.contains("-")){log.info("不是故障事件");return;} BugMalfunctionResult result = new BugMalfunctionResult(); String[] split = key.split("-"); result.setVin(split[0]); diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsActiveServiceImpl.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsActiveServiceImpl.java index 0fabe44..4e1a28b 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsActiveServiceImpl.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsActiveServiceImpl.java @@ -55,13 +55,16 @@ public class EventsActiveServiceImpl extends ServiceImpl() {{ eq(VehicleEvent::getVin, vin); }}); + if (vehicleEvent==null){ + log.warn("该车辆没有其他事件"); + return null; + } cacheHalf.put(vin + "-events", vehicleEvent); log.info("数据库"); Object ifPresent = cacheHalf.getIfPresent(vin + "-events");