diff --git a/mobai-event-common/src/main/java/com/mobai/req/VehicleReq.java b/mobai-event-common/src/main/java/com/mobai/req/VehicleReq.java index eec4d0d..9d50ec6 100644 --- a/mobai-event-common/src/main/java/com/mobai/req/VehicleReq.java +++ b/mobai-event-common/src/main/java/com/mobai/req/VehicleReq.java @@ -30,5 +30,6 @@ public class VehicleReq { * 结束时间点 */ private Long endTime; + private String code; } diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java index 13718fb..925bd8a 100644 --- a/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java +++ b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java @@ -82,8 +82,8 @@ public class IotDBSessionConfig { public void insertRecord(String deviceId, Long time,List measurementsList, List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { if (measurementsList.size() == valuesList.size()) { session.insertRecord(deviceId, time, measurementsList, valuesList); - System.out.println(measurementsList); - System.out.println(valuesList); + log.info("键::{}",measurementsList); + log.info("值::{}",valuesList); } else { log.error("measurementsList 与 valuesList 值不对应"); } diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java b/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java index d1cf6aa..c74b988 100644 --- a/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java +++ b/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java @@ -14,6 +14,7 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.rmi.ServerException; +import java.util.Map; /** * description: iotdb 控制层 @@ -31,11 +32,11 @@ public class IotDbController { /** * 插入数据 - * @param vehicle + * @param map */ @PostMapping("/api/device/insert") - public ResponseData insert(@RequestBody Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException { - iotDbServer.insertData(vehicle); + public ResponseData insert(@RequestBody Map map) throws StatementExecutionException, ServerException, IoTDBConnectionException { + iotDbServer.insertData(map); return ResponseData.success(); } diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/service/IotDbServer.java b/mobai-event-service/src/main/java/com/mobai/iotDB/service/IotDbServer.java index 169c55a..11ddda9 100644 --- a/mobai-event-service/src/main/java/com/mobai/iotDB/service/IotDbServer.java +++ b/mobai-event-service/src/main/java/com/mobai/iotDB/service/IotDbServer.java @@ -7,6 +7,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import java.rmi.ServerException; +import java.util.Map; /** * @ClassName IotDbServer @@ -16,7 +17,7 @@ import java.rmi.ServerException; */ public interface IotDbServer { - void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException; + void insertData(Map map) throws StatementExecutionException, ServerException, IoTDBConnectionException; Object queryDataFromIotDb(VehicleReq req) throws Exception; diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java index 96b3b2d..02b0927 100644 --- a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java +++ b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java @@ -13,7 +13,6 @@ import org.apache.iotdb.tsfile.read.common.RowRecord; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.math.BigDecimal; import java.rmi.ServerException; import java.util.ArrayList; import java.util.HashMap; @@ -35,126 +34,41 @@ public class IotDbServerImpl implements IotDbServer { @Override - public void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException { + public void insertData(Map map) throws StatementExecutionException, ServerException, IoTDBConnectionException { // iotDbParam: 模拟设备上报消息 // bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码 - String deviceId = "root.vin." + vehicle.getVin(); + String deviceId = "root.vin.map." + map.get("vin"); // 将设备上报的数据存入数据库(时序数据库) - List measurementsList = new ArrayList<>() {{ - add("startTime"); - add("longitude"); - add("latitude"); - add("speed"); - add("mileage"); - add("voltage"); - add("current"); - add("resistance"); - add("gear"); - add("accelerationPedal"); - add("brakePedal"); - add("fuelConsumptionRate"); - add("motorControllerTemperature"); - add("motorSpeed"); - add("motoTorque"); - add("motorTemperature"); - add("motorVoltage"); - add("motorCurrent"); - add("remainingBattery"); - add("maximumFeedbackPower"); - add("maximumDischargePower"); - add("selfCheckCounter"); - add("totalBatteryCurrent"); - add("totalBatteryVoltage"); - add("singleBatteryMaxVoltage"); - add("singleBatteryMinVoltage"); - add("singleBatteryMaxTemperature"); - add("singleBatteryMinTemperature"); - add("availableBatteryCapacity"); - add("vehicleStatus"); - add("chargingStatus"); - add("socStatus"); - add("operatingStatus"); - add("chargingEnergyStorageStatus"); - add("driveMotorStatus"); - add("positionStatus"); - add("easStatus"); - add("ptcStatus"); - add("epsStatus"); - add("absStatus"); - add("mcuStatus"); - add("heatingStatus"); - add("batteryStatus"); - add("batteryInsulationStatus"); - add("dcdcStatus"); - add("chgStatus"); - }}; - //车辆具体数据 - List valuesList = new ArrayList<>() {{ - add(String.valueOf(vehicle.getStartTime())); - add(String.valueOf(vehicle.getLongitude())); - add(String.valueOf(vehicle.getLatitude())); - add(String.valueOf(vehicle.getSpeed())); - add(String.valueOf(vehicle.getMileage())); - add(String.valueOf(vehicle.getVoltage())); - add(String.valueOf(vehicle.getCurrent())); - add(String.valueOf(vehicle.getResistance())); - add(String.valueOf(vehicle.getGear())); - add(String.valueOf(vehicle.getAccelerationPedal())); - add(String.valueOf(vehicle.getBrakePedal())); - add(String.valueOf(vehicle.getFuelConsumptionRate())); - add(String.valueOf(vehicle.getMotorControllerTemperature())); - add(String.valueOf(vehicle.getMotorSpeed())); - add(String.valueOf(vehicle.getMotoTorque())); - add(String.valueOf(vehicle.getMotorTemperature())); - add(String.valueOf(vehicle.getMotorVoltage())); - add(String.valueOf(vehicle.getMotorCurrent())); - add(String.valueOf(vehicle.getRemainingBattery())); - add(String.valueOf(vehicle.getMaximumFeedbackPower())); - add(String.valueOf(vehicle.getMaximumDischargePower())); - add(String.valueOf(vehicle.getSelfCheckCounter())); - add(String.valueOf(vehicle.getTotalBatteryCurrent())); - add(String.valueOf(vehicle.getTotalBatteryVoltage())); - add(String.valueOf(vehicle.getSingleBatteryMaxVoltage())); - add(String.valueOf(vehicle.getSingleBatteryMinVoltage())); - add(String.valueOf(vehicle.getSingleBatteryMaxTemperature())); - add(String.valueOf(vehicle.getSingleBatteryMinTemperature())); - add(String.valueOf(vehicle.getAvailableBatteryCapacity())); - add(String.valueOf(vehicle.getVehicleStatus())); - add(String.valueOf(vehicle.getChargingStatus())); - add(String.valueOf(vehicle.getSocStatus())); - add(String.valueOf(vehicle.getOperatingStatus())); - add(String.valueOf(vehicle.getChargingEnergyStorageStatus())); - add(String.valueOf(vehicle.getDriveMotorStatus())); - add(String.valueOf(vehicle.getPositionStatus())); - add(String.valueOf(vehicle.getEasStatus())); - add(String.valueOf(vehicle.getPtcStatus())); - add(String.valueOf(vehicle.getEpsStatus())); - add(String.valueOf(vehicle.getAbsStatus())); - add(String.valueOf(vehicle.getMcuStatus())); - add(String.valueOf(vehicle.getHeatingStatus())); - add(String.valueOf(vehicle.getBatteryStatus())); - add(String.valueOf(vehicle.getBatteryInsulationStatus())); - add(String.valueOf(vehicle.getDcdcStatus())); - add(String.valueOf(vehicle.getChgStatus())); - }}; - System.out.println(vehicle); - iotDBSessionConfig.insertRecord(deviceId, vehicle.getStartTime(), measurementsList, valuesList); + iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(map.get("drivingRoute")), + map.keySet().stream().toList(), map.values().stream().toList()); } + /** + * 查询时间区间数据并返回 + * + * @param req + * @return + * @throws Exception + */ @Override - public List queryDataFromIotDb(VehicleReq req) throws Exception { - List iotDbResultList = new ArrayList<>(); + public List> queryDataFromIotDb(VehicleReq req) throws Exception { + List> mapList = new ArrayList<>(); + if (null != req.getVin()) { - String sql = "select * from " + "root.vin." + req.getVin(); + + String sql = "select * from " + "root.vin.map." + req.getVin(); + if (req.getCode() == null) { + sql = sql.replace("*", req.getCode()); + } // 开始时间 if (req.getStartTime() != null && req.getStartTime() != 0) { - sql += " where startTime >= " + req.getStartTime(); + sql += " where drivingRoute >= " + req.getStartTime(); } // 结束时间 if (req.getEndTime() != null && req.getEndTime() != 0) { - sql += " and startTime <= " + req.getEndTime(); + sql += " and drivingRoute <= " + req.getEndTime(); } SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql); List columnNames = sessionDataSet.getColumnNames(); @@ -165,24 +79,24 @@ public class IotDbServerImpl implements IotDbServer { titleList.add(temp[temp.length - 1]); } // 封装处理数据 - packagingData(req, iotDbResultList, sessionDataSet, titleList); + packagingData(req, mapList, sessionDataSet, titleList); } else { log.info("VIN不能为空!!"); } - return iotDbResultList; + return mapList; } /** * 封装处理数据 * * @param req - * @param iotDbResultList + * @param mapList * @param sessionDataSet * @param titleList * @throws StatementExecutionException * @throws IoTDBConnectionException */ - private void packagingData(VehicleReq req, List iotDbResultList, SessionDataSet sessionDataSet, List titleList) + private void packagingData(VehicleReq req, List> mapList, SessionDataSet sessionDataSet, List titleList) throws StatementExecutionException, IoTDBConnectionException { int fetchSize = sessionDataSet.getFetchSize(); if (fetchSize > 0) { @@ -192,63 +106,14 @@ public class IotDbServerImpl implements IotDbServer { List fields = next.getFields(); vehicle.setStartTime(next.getTimestamp()); Map map = new HashMap<>(); - for (int i = 0; i < fields.size(); i++) { Field field = fields.get(i); // 这里的需要按照类型获取 Object obj = field.getObjectValue(field.getDataType()); - map.put(titleList.get(i), obj==null? "null":obj.toString()); + map.put(titleList.get(i), obj == null ? "null" : obj.toString()); } - log.info(map); -// vehicle.setStartTime(Long.valueOf(map.get("startTime"))); - vehicle.setVin(map.get("vin")); - vehicle.setSpeed(new BigDecimal(map.get("speed"))); - vehicle.setLatitude(new BigDecimal(map.get("latitude"))); - vehicle.setLongitude(new BigDecimal(map.get("longitude"))); - vehicle.setMileage(new BigDecimal(map.get("mileage"))); - vehicle.setGear(map.get("gear")); - vehicle.setVehicleStatus(new BigDecimal(map.get("vehicleStatus")).intValue()); - vehicle.setChargingStatus(new BigDecimal(map.get("chargingStatus")).intValue()); - vehicle.setChargingEnergyStorageStatus(new BigDecimal(map.get("chargingEnergyStorageStatus")).intValue()); - vehicle.setEasStatus(new BigDecimal(map.get("easStatus")).intValue()); - vehicle.setMotorControllerTemperature(new BigDecimal(map.get("motorControllerTemperature"))); - vehicle.setTotalBatteryCurrent(new BigDecimal(map.get("totalBatteryCurrent"))); - vehicle.setSingleBatteryMaxVoltage(new BigDecimal(map.get("singleBatteryMaxVoltage"))); - vehicle.setOperatingStatus(new BigDecimal(map.get("operatingStatus")).intValue()); - vehicle.setHeatingStatus(new BigDecimal(map.get("heatingStatus")).intValue()); - vehicle.setDcdcStatus(new BigDecimal(map.get("dcdcStatus")).intValue()); - vehicle.setDriveMotorStatus(new BigDecimal(map.get("driveMotorStatus")).intValue()); - vehicle.setPositionStatus(new BigDecimal(map.get("positionStatus")).intValue()); - vehicle.setPtcStatus(new BigDecimal(map.get("ptcStatus")).intValue()); - vehicle.setEpsStatus(new BigDecimal(map.get("epsStatus")).intValue()); - vehicle.setAbsStatus(new BigDecimal(map.get("absStatus")).intValue()); - vehicle.setMcuStatus(new BigDecimal(map.get("mcuStatus")).intValue()); - vehicle.setBatteryInsulationStatus(new BigDecimal(map.get("batteryInsulationStatus")).intValue()); - vehicle.setBatteryStatus(new BigDecimal(map.get("batteryStatus")).intValue()); - vehicle.setChgStatus(new BigDecimal(map.get("chgStatus")).intValue()); - vehicle.setTotalBatteryVoltage(new BigDecimal(map.get("totalBatteryVoltage"))); - vehicle.setMotorSpeed(new BigDecimal(map.get("motorSpeed"))); - vehicle.setMotorCurrent(new BigDecimal(map.get("motorCurrent"))); - vehicle.setMotorVoltage(new BigDecimal(map.get("motorVoltage"))); - vehicle.setAccelerationPedal(new BigDecimal(map.get("accelerationPedal"))); - vehicle.setBrakePedal(new BigDecimal(map.get("brakePedal"))); - vehicle.setSelfCheckCounter(new BigDecimal(map.get("selfCheckCounter"))); - vehicle.setMotorTemperature(new BigDecimal(map.get("motorTemperature"))); - vehicle.setMaximumDischargePower(new BigDecimal(map.get("maximumDischargePower"))); - vehicle.setMaximumFeedbackPower(new BigDecimal(map.get("maximumFeedbackPower"))); - vehicle.setFuelConsumptionRate(new BigDecimal(map.get("fuelConsumptionRate"))); - vehicle.setVin(req.getVin()); - vehicle.setVoltage(new BigDecimal(map.get("voltage"))); - vehicle.setCurrent(new BigDecimal(map.get("current"))); - vehicle.setResistance(new BigDecimal(map.get("resistance"))); - vehicle.setMotoTorque(new BigDecimal(map.get("motoTorque"))); - vehicle.setRemainingBattery(new BigDecimal(map.get("remainingBattery"))); - vehicle.setSingleBatteryMinVoltage(new BigDecimal(map.get("singleBatteryMinVoltage"))); - vehicle.setSingleBatteryMaxTemperature(new BigDecimal(map.get("singleBatteryMaxTemperature"))); - vehicle.setSingleBatteryMinTemperature(new BigDecimal(map.get("singleBatteryMinTemperature"))); - vehicle.setAvailableBatteryCapacity(new BigDecimal(map.get("availableBatteryCapacity"))); - iotDbResultList.add(vehicle); + mapList.add(map); } } } diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java index fa5642f..5d041dd 100644 --- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java +++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java @@ -24,6 +24,7 @@ import java.rmi.ServerException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; /** * @description: kafka 消费者 @@ -59,10 +60,11 @@ public class KafkaConsumerListenerExample { @KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics") public void consume(ConsumerRecord record) { log.info("消费信息为:{}",record); - Vehicle vehicle = getVehicle(record.value()); + // 无数据接口,存map集合 + Map map = JSON.parseObject(record.value(),Map.class); // 存入iotDB try { - iotDbServer.insertData(vehicle); + iotDbServer.insertData(map); log.info("添加成功"); } catch (StatementExecutionException e) { diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java index 0b3028f..86eae20 100644 --- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java +++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java @@ -91,11 +91,11 @@ public class VinConsumerRunner implements ApplicationRunner { List> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value()); - // 报文解析为对象 -// Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value()); + // 获取数据并存入 Map map = JSON.parseObject(record.value(), Map.class); // 获取对应的事件 VehicleEvent events = eventsService.getEvents(map.get("vin")); + log.info("执行事件:{}",events); HandlerHelper.doHandler(events, map, redisService); } } diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/HandlerHelper.java b/mobai-event-service/src/main/java/com/mobai/vehicle/HandlerHelper.java index 5d8921c..1029fa2 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/HandlerHelper.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/HandlerHelper.java @@ -1,6 +1,5 @@ package com.mobai.vehicle; -import com.mobai.domain.Vehicle; import com.mobai.domain.VehicleEvent; import com.mobai.utils.RedisService; import com.mobai.vehicle.event.constants.EventHandler; @@ -20,7 +19,7 @@ public final class HandlerHelper { * @param events 事件类型 * @param vehicle 请求体 vin+事件状态 */ - public static void doHandler(VehicleEvent events, Map vehicle, RedisService redisService) { + public static void doHandler(VehicleEvent events, Map vehicle, RedisService redisService) { List list = Arrays.stream(events.getEvents().split("-")).toList(); list.forEach(type -> { EventHandler.getEvent(type).activeEvent(vehicle, redisService); diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/cache/caffeine/CaffeineCache.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/cache/caffeine/CaffeineCache.java index 039c95c..97d40cc 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/cache/caffeine/CaffeineCache.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/cache/caffeine/CaffeineCache.java @@ -57,4 +57,23 @@ public class CaffeineCache { .build(); } +/** + * 10分钟缓存 + * @return cache缓存 + */ + @Bean(name = "createTenMinuteCaffeine") + public Cache createTenMinuteCaffeine() { + return Caffeine.newBuilder() + // 设置初始连接数 + .initialCapacity(1000) + //半小时没有读自动删除 访问 + .expireAfterAccess(10, TimeUnit.MINUTES) + //最大容量1024个,超过会自动清理空间 + .maximumSize(1024) + .removalListener(((key, value, cause) -> { + //清理通知 key,value ==> 键值对 cause ==> 清理原因 + })) + .build(); + } + } diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/constants/EventHandler.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/constants/EventHandler.java index ff3ef9f..b7fa83d 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/constants/EventHandler.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/constants/EventHandler.java @@ -2,10 +2,7 @@ package com.mobai.vehicle.event.constants; import cn.hutool.extra.spring.SpringUtil; import com.mobai.vehicle.EventActive; -import com.mobai.vehicle.event.factory.BugMalfunctionFactory; -import com.mobai.vehicle.event.factory.ElectronicFenceFactory; -import com.mobai.vehicle.event.factory.IndexWaringFactory; -import com.mobai.vehicle.event.factory.RealTimeFactory; +import com.mobai.vehicle.event.factory.*; import lombok.extern.log4j.Log4j2; import java.util.Arrays; @@ -25,6 +22,7 @@ public enum EventHandler { REAL_TIME_DATA(RealTimeFactory.class), INDEX_WARNING(IndexWaringFactory.class), BUG_MALFUNCTION(BugMalfunctionFactory.class), +// STANDARD_WARNING(StandardWarningFactory.class), ELECTRONIC_FENCE(ElectronicFenceFactory.class); private String code; diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java index 5ffd2a2..19ab931 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java @@ -98,47 +98,46 @@ public class BugMalfunctionFactory implements EventActive { result.setFaultCode(FaultCode.POSITION.getString()); this.active(result); } - if (vehicle.get("EasStatus").equals(0+"")) { + if (vehicle.get("easStatus").equals(0+"")) { result.setFaultCode(FaultCode.EAS.getString()); this.active(result); } - if (vehicle.get("PtcStatus").equals(0+"")) { + if (vehicle.get("ptcStatus").equals(0+"")) { result.setFaultCode(FaultCode.PTC.getString()); this.active(result); } - if (vehicle.get("EpsStatus").equals(0+"")) { + if (vehicle.get("epsStatus").equals(0+"")) { result.setFaultCode(FaultCode.EPS.getString()); this.active(result); } - if (vehicle.get("AbsStatus").equals(0+"")) { + if (vehicle.get("absStatus").equals(0+"")) { result.setFaultCode(FaultCode.ABS.getString()); this.active(result); } - if (vehicle.get("McuStatus").equals(0+"")) { + if (vehicle.get("mcuStatus").equals(0+"")) { result.setFaultCode(FaultCode.MCU.getString()); this.active(result); } - if (vehicle.get("HeatingStatus").equals(0+"")) { + if (vehicle.get("heatingStatus").equals(0+"")) { result.setFaultCode(FaultCode.HEATING.getString()); this.active(result); } - if (vehicle.get("BatteryStatus").equals(0+"")) { + if (vehicle.get("batteryStatus").equals(0+"")) { result.setFaultCode(FaultCode.BATTERY.getString()); this.active(result); } - if (vehicle.get("BatteryInsulationStatus").equals(0+"")) { + if (vehicle.get("batteryInsulationStatus").equals(0+"")) { result.setFaultCode(FaultCode.BATTERY_INSULATION.getString()); this.active(result); } - if (vehicle.get("DcdcStatus").equals(0+"")) { + if (vehicle.get("dcdcStatus").equals(0+"")) { result.setFaultCode(FaultCode.DCDC.getString()); this.active(result); } - if (vehicle.get("ChgStatus").equals(0+"")) { + if (vehicle.get("chgStatus").equals(0+"")) { result.setFaultCode(FaultCode.CHG.getString()); this.active(result); } - } diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/IndexWaringFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/IndexWaringFactory.java index 813f44a..ab48fa7 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/IndexWaringFactory.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/IndexWaringFactory.java @@ -1,6 +1,5 @@ package com.mobai.vehicle.event.factory; -import com.mobai.domain.Vehicle; import com.mobai.utils.RedisService; import com.mobai.vehicle.EventActive; import lombok.extern.log4j.Log4j2; @@ -10,7 +9,7 @@ import org.springframework.stereotype.Component; import java.util.Map; /** - * 实时数据工厂 + * 故障报警工厂 * @author Saisai * @className BugMalfunctionFactory * @description 描述 diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/StandardWarningFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/StandardWarningFactory.java new file mode 100644 index 0000000..916f22c --- /dev/null +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/StandardWarningFactory.java @@ -0,0 +1,279 @@ +package com.mobai.vehicle.event.factory; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.github.benmanes.caffeine.cache.Cache; +import com.mobai.iotDB.service.impl.IotDbServerImpl; +import com.mobai.req.VehicleReq; +import com.mobai.utils.RedisService; +import lombok.Data; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static java.lang.Thread.sleep; + +/** + * 指标预警工厂 + * + * @author Saisai + * @className BugMalfunctionFactory + * @description 描述 + * @date 2024/6/20 14:42 + */ + + +@Data +@Log4j2 +@Component +public class StandardWarningFactory { + + + @Autowired + private RedisTemplate redisTemplate; + + /** + * keycode-level-times + */ + @Resource(name = "createTenMinuteCaffeine") + private Cache cache; + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private IotDbServerImpl iotDbServerImpl; + + private Map scheduleMap; + + /** + * keycode-cycleLength-level + */ + private ConcurrentHashMap> vinLevel; + + private List vins; + + + @RabbitListener(bindings = @QueueBinding( + value = @Queue, + exchange = @Exchange(name = "vehicle-prop", type = ExchangeTypes.FANOUT) + )) + public void changeProp(String vin) { + List attrs = new ArrayList<>(); + List attrLength = new ArrayList<>(); + List propList = redisTemplate.opsForList().range(vin + "List", 0, -1); + for (String string : propList) { + JSONObject prop = JSON.parseObject(string); + // 指标属性 + attrs.add(String.valueOf(prop.get("keyCode"))); + // 周期延迟时长 + attrLength.add((Integer) prop.get("times")); + attrs.add(prop.get("keyCode") + "-" + prop.get("level") + "-" + prop.get("times")); + } + cache.put(vin + "-keyCode", JSON.toJSONString(attrs)); // vin keycode-level-times + log.info("更新缓存:{}", vin); + // 指标未编写和存入 + } + + + /** + * 上线开启事件的车辆 + * + * @param vin + */ + @RabbitListener(queues = "standard-Warn-Event-Start") + public void joinStandardVehicle(String vin) { + List cacheProps = new ArrayList<>(); + log.info("车辆上线:{}", vin); + List props = redisTemplate.opsForList().range(vin + "Prop", 0, -1); + props.forEach(prop -> { + JSONObject jsonObject = JSON.parseObject(prop); + String keyCode = String.valueOf(jsonObject.get("keyCode")); + String level = String.valueOf(jsonObject.get("level")); + String windowLength = String.valueOf(jsonObject.get("windowLength")); + String cycleLength = String.valueOf(jsonObject.get("activeLength")); + // cycle = windowLEngth * cycle + cacheProps.add(keyCode + "-" + cycleLength + "-" + level); + }); + // 存入车辆指标缓存 code-cycleLength-level + cache.put(vin + "-keyCode", JSON.toJSONString(props)); + vins.add(vin); + List vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); + if (vinLevel == null) { + changeProp(vin); + vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); // keycode-level-times + } + Map map = new HashMap<>(); + vinLevel.stream().map(str -> (map.put(str.split("-")[0], str.split("-")[1]))); // key : attr value : level + String[] split = vinLevel.get(0).split("-"); // vin-level-times + String times = split[2]; + try { + List> vehicleList = iotDbServerImpl.queryDataFromIotDb( + new VehicleReq() {{ + setVin(vin); + setStartTime(new Date().getTime() - Long.parseLong(times) * 1000); + setEndTime(new Date().getTime()); + }}); + List finalVinLevel = vinLevel; + HashMap> result = new HashMap<>(); + // 数据查询结果处理 + vehicleList.forEach(vehicleMap -> { + for (String vinInfo : finalVinLevel) { + String[] vinInfoSplit = vinInfo.split("-"); + // 数据库查询结果 + String string = vehicleMap.get(vinInfoSplit[0]); + int resu = Integer.parseInt(vinInfoSplit[1]) - Integer.parseInt(string); + if (!result.containsKey(string)) { + result.put(string, new ArrayList()); + } + result.get(string).add(resu); + // 指标异常处理 + if (resu > 10 || resu < -10) { + log.info("指标有问题,预警发生ing"); +// rabbitTemplate.convertAndSend("standardWarning-error", vinInfoSplit[0] + ":" + string); + } + } + }); + log.info("发送了结果"); +// rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + + /** + * 下线关闭事件的车辆 + * + * @param vin + */ + @RabbitListener(queues = "standard-Warn-Event-End") + public void outStandardVehicle(String vin) { + log.info("车辆下线:{}", vin); + +// scheduleMap.get(vin + "-schedule").shutdown(); + } + + + // 周期线程池 + public void activeEvent(Map vehicle, RedisService redisService) { + // 创建初始的周期性线程池 + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); + + // 提交一些任务给线程池执行 + for (String vin : vins) {// keycode-cycleLength-times + List strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); + if (strings.isEmpty()){ + changeProp(vin); + } + executor.submit(() -> { + for (String string : strings) { + // keycode-cycleLength-level + String[] split = string.split("-"); + try { + List list = new ArrayList<>(); + iotDbServerImpl.queryDataFromIotDb(new VehicleReq() {{ + setVin(vin); + setCode(split[0]); + setEndTime(new Date().getTime()); + setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000); + }}).stream().map(map -> list.addAll(map.values())); + List collect = list.stream().map(Integer::valueOf).toList(); + int avg = (int) collect.stream().mapToInt(number -> number).average().getAsDouble(); + List result = new ArrayList<>(); + for (Integer i : collect) { + result.add(i - avg); + if (i>Integer.parseInt(split[2])){ + rabbitTemplate.convertAndSend("standardWarning-error", split[0] + ":" + i); + } + } + rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + // 提交线程池 + // 只用延迟线程 +// executor.scheduleWithFixedDelay(); + + } + + + /** + * 周期型线程池执行方法 + */ + private static void way2() { + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); // 参数--核线程个数 + + scheduledExecutorService.scheduleWithFixedDelay(() -> { + System.out.println(Thread.currentThread().getName() + " → " + " Start Time = " + new Date()); + try { + sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println(Thread.currentThread().getName() + " → " + " End Time = " + new Date()); + }, 1, 4, TimeUnit.SECONDS); // 延迟 1s,周期 4s + + } + +// public void sche(String[] args) { +// ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); +// +// // 初始延迟和周期 +// long initialDelay = 0; +// long period = 1000; // 初始周期为1秒 +// +// // 提交一个周期性任务 +// ScheduledFuture future = executor.scheduleAtFixedRate(() -> { +// // 模拟任务执行逻辑 +// System.out.println("Task execution..."); +// +// // 模拟根据条件动态调整执行周期或取消任务 +// if (someCondition()) { +// // 根据条件取消当前任务 +// future.cancel(false); +// System.out.println("Task cancelled due to condition."); +// +// // 根据条件重新安排一个新任务 +// long newDelay = 2000; // 新的延迟为2秒 +// long newPeriod = 3000; // 新的周期为3秒 +// future = executor.scheduleAtFixedRate(() -> { +// System.out.println("New task execution..."); +// }, newDelay, newPeriod, TimeUnit.MILLISECONDS); +// } +// }, initialDelay, period, TimeUnit.MILLISECONDS); +// +// // 等待一段时间后关闭 executor +// try { +// Thread.sleep(15000); // 等待15秒钟,演示多次执行和动态调整 +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// executor.shutdown(); +// } +// +// // 示例条件方法,根据具体情况自行替换 +// private static boolean someCondition() { +// // 这里可以根据具体逻辑返回是否满足条件 +// return Math.random() < 0.1; // 模拟10%的概率满足条件 +// } + +} diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/standard/StandardEvent.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/standard/StandardEvent.java index 1bb1bb0..ebd0dfa 100644 --- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/standard/StandardEvent.java +++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/standard/StandardEvent.java @@ -15,16 +15,16 @@ import org.springframework.stereotype.Component; public class StandardEvent { - @RabbitListener(queues = {"standard_Warn_Event_Start"}) - public void eventStart(String msg, Message message, AMQP.Channel channel) { - - } - - - @RabbitListener(queues = {"standard_Warn_Event_End"}) - public void eventEnd(String msg, Message message, AMQP.Channel channel) { - - } +// @RabbitListener(queues = {"standard_Warn_Event_Start"}) +// public void eventStart(String msg, Message message, AMQP.Channel channel) { +// +// } +// +// +// @RabbitListener(queues = {"standard_Warn_Event_End"}) +// public void eventEnd(String msg, Message message, AMQP.Channel channel) { +// +// } } diff --git a/mobai-event-service/src/main/resources/application.yml b/mobai-event-service/src/main/resources/application.yml index acf54a8..8fd4200 100644 --- a/mobai-event-service/src/main/resources/application.yml +++ b/mobai-event-service/src/main/resources/application.yml @@ -4,7 +4,6 @@ kafka: topic: vehicle-event-topic0 partition: 0 spring: - redis: host: 175.24.138.82 rabbitmq: