指标预警计算

master
Saisai Liu 2024-06-28 22:42:56 +08:00
parent ca38a075b7
commit e040751cbe
15 changed files with 364 additions and 202 deletions

View File

@ -30,5 +30,6 @@ public class VehicleReq {
*
*/
private Long endTime;
private String code;
}

View File

@ -82,8 +82,8 @@ public class IotDBSessionConfig {
public void insertRecord(String deviceId, Long time,List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecord(deviceId, time, measurementsList, valuesList);
System.out.println(measurementsList);
System.out.println(valuesList);
log.info("键::{}",measurementsList);
log.info("值::{}",valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}

View File

@ -14,6 +14,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.Map;
/**
* description: iotdb
@ -31,11 +32,11 @@ public class IotDbController {
/**
*
* @param vehicle
* @param map
*/
@PostMapping("/api/device/insert")
public ResponseData insert(@RequestBody Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(vehicle);
public ResponseData insert(@RequestBody Map<String,String> map) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(map);
return ResponseData.success();
}

View File

@ -7,6 +7,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import java.rmi.ServerException;
import java.util.Map;
/**
* @ClassName IotDbServer
@ -16,7 +17,7 @@ import java.rmi.ServerException;
*/
public interface IotDbServer {
void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException;
void insertData(Map<String,String> map) throws StatementExecutionException, ServerException, IoTDBConnectionException;
Object queryDataFromIotDb(VehicleReq req) throws Exception;

View File

@ -13,7 +13,6 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
@ -35,126 +34,41 @@ public class IotDbServerImpl implements IotDbServer {
@Override
public void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException {
public void insertData(Map<String, String> map) throws StatementExecutionException, ServerException, IoTDBConnectionException {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.vin." + vehicle.getVin();
String deviceId = "root.vin.map." + map.get("vin");
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>() {{
add("startTime");
add("longitude");
add("latitude");
add("speed");
add("mileage");
add("voltage");
add("current");
add("resistance");
add("gear");
add("accelerationPedal");
add("brakePedal");
add("fuelConsumptionRate");
add("motorControllerTemperature");
add("motorSpeed");
add("motoTorque");
add("motorTemperature");
add("motorVoltage");
add("motorCurrent");
add("remainingBattery");
add("maximumFeedbackPower");
add("maximumDischargePower");
add("selfCheckCounter");
add("totalBatteryCurrent");
add("totalBatteryVoltage");
add("singleBatteryMaxVoltage");
add("singleBatteryMinVoltage");
add("singleBatteryMaxTemperature");
add("singleBatteryMinTemperature");
add("availableBatteryCapacity");
add("vehicleStatus");
add("chargingStatus");
add("socStatus");
add("operatingStatus");
add("chargingEnergyStorageStatus");
add("driveMotorStatus");
add("positionStatus");
add("easStatus");
add("ptcStatus");
add("epsStatus");
add("absStatus");
add("mcuStatus");
add("heatingStatus");
add("batteryStatus");
add("batteryInsulationStatus");
add("dcdcStatus");
add("chgStatus");
}};
//车辆具体数据
List<String> valuesList = new ArrayList<>() {{
add(String.valueOf(vehicle.getStartTime()));
add(String.valueOf(vehicle.getLongitude()));
add(String.valueOf(vehicle.getLatitude()));
add(String.valueOf(vehicle.getSpeed()));
add(String.valueOf(vehicle.getMileage()));
add(String.valueOf(vehicle.getVoltage()));
add(String.valueOf(vehicle.getCurrent()));
add(String.valueOf(vehicle.getResistance()));
add(String.valueOf(vehicle.getGear()));
add(String.valueOf(vehicle.getAccelerationPedal()));
add(String.valueOf(vehicle.getBrakePedal()));
add(String.valueOf(vehicle.getFuelConsumptionRate()));
add(String.valueOf(vehicle.getMotorControllerTemperature()));
add(String.valueOf(vehicle.getMotorSpeed()));
add(String.valueOf(vehicle.getMotoTorque()));
add(String.valueOf(vehicle.getMotorTemperature()));
add(String.valueOf(vehicle.getMotorVoltage()));
add(String.valueOf(vehicle.getMotorCurrent()));
add(String.valueOf(vehicle.getRemainingBattery()));
add(String.valueOf(vehicle.getMaximumFeedbackPower()));
add(String.valueOf(vehicle.getMaximumDischargePower()));
add(String.valueOf(vehicle.getSelfCheckCounter()));
add(String.valueOf(vehicle.getTotalBatteryCurrent()));
add(String.valueOf(vehicle.getTotalBatteryVoltage()));
add(String.valueOf(vehicle.getSingleBatteryMaxVoltage()));
add(String.valueOf(vehicle.getSingleBatteryMinVoltage()));
add(String.valueOf(vehicle.getSingleBatteryMaxTemperature()));
add(String.valueOf(vehicle.getSingleBatteryMinTemperature()));
add(String.valueOf(vehicle.getAvailableBatteryCapacity()));
add(String.valueOf(vehicle.getVehicleStatus()));
add(String.valueOf(vehicle.getChargingStatus()));
add(String.valueOf(vehicle.getSocStatus()));
add(String.valueOf(vehicle.getOperatingStatus()));
add(String.valueOf(vehicle.getChargingEnergyStorageStatus()));
add(String.valueOf(vehicle.getDriveMotorStatus()));
add(String.valueOf(vehicle.getPositionStatus()));
add(String.valueOf(vehicle.getEasStatus()));
add(String.valueOf(vehicle.getPtcStatus()));
add(String.valueOf(vehicle.getEpsStatus()));
add(String.valueOf(vehicle.getAbsStatus()));
add(String.valueOf(vehicle.getMcuStatus()));
add(String.valueOf(vehicle.getHeatingStatus()));
add(String.valueOf(vehicle.getBatteryStatus()));
add(String.valueOf(vehicle.getBatteryInsulationStatus()));
add(String.valueOf(vehicle.getDcdcStatus()));
add(String.valueOf(vehicle.getChgStatus()));
}};
System.out.println(vehicle);
iotDBSessionConfig.insertRecord(deviceId, vehicle.getStartTime(), measurementsList, valuesList);
iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(map.get("drivingRoute")),
map.keySet().stream().toList(), map.values().stream().toList());
}
/**
*
*
* @param req
* @return
* @throws Exception
*/
@Override
public List<Vehicle> queryDataFromIotDb(VehicleReq req) throws Exception {
List<Vehicle> iotDbResultList = new ArrayList<>();
public List<Map<String, String>> queryDataFromIotDb(VehicleReq req) throws Exception {
List<Map<String, String>> mapList = new ArrayList<>();
if (null != req.getVin()) {
String sql = "select * from " + "root.vin." + req.getVin();
String sql = "select * from " + "root.vin.map." + req.getVin();
if (req.getCode() == null) {
sql = sql.replace("*", req.getCode());
}
// 开始时间
if (req.getStartTime() != null && req.getStartTime() != 0) {
sql += " where startTime >= " + req.getStartTime();
sql += " where drivingRoute >= " + req.getStartTime();
}
// 结束时间
if (req.getEndTime() != null && req.getEndTime() != 0) {
sql += " and startTime <= " + req.getEndTime();
sql += " and drivingRoute <= " + req.getEndTime();
}
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
@ -165,24 +79,24 @@ public class IotDbServerImpl implements IotDbServer {
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(req, iotDbResultList, sessionDataSet, titleList);
packagingData(req, mapList, sessionDataSet, titleList);
} else {
log.info("VIN不能为空");
}
return iotDbResultList;
return mapList;
}
/**
*
*
* @param req
* @param iotDbResultList
* @param mapList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(VehicleReq req, List<Vehicle> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
private void packagingData(VehicleReq req, List<Map<String, String>> mapList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
@ -192,7 +106,6 @@ public class IotDbServerImpl implements IotDbServer {
List<Field> fields = next.getFields();
vehicle.setStartTime(next.getTimestamp());
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
@ -200,55 +113,7 @@ public class IotDbServerImpl implements IotDbServer {
Object obj = field.getObjectValue(field.getDataType());
map.put(titleList.get(i), obj == null ? "null" : obj.toString());
}
log.info(map);
// vehicle.setStartTime(Long.valueOf(map.get("startTime")));
vehicle.setVin(map.get("vin"));
vehicle.setSpeed(new BigDecimal(map.get("speed")));
vehicle.setLatitude(new BigDecimal(map.get("latitude")));
vehicle.setLongitude(new BigDecimal(map.get("longitude")));
vehicle.setMileage(new BigDecimal(map.get("mileage")));
vehicle.setGear(map.get("gear"));
vehicle.setVehicleStatus(new BigDecimal(map.get("vehicleStatus")).intValue());
vehicle.setChargingStatus(new BigDecimal(map.get("chargingStatus")).intValue());
vehicle.setChargingEnergyStorageStatus(new BigDecimal(map.get("chargingEnergyStorageStatus")).intValue());
vehicle.setEasStatus(new BigDecimal(map.get("easStatus")).intValue());
vehicle.setMotorControllerTemperature(new BigDecimal(map.get("motorControllerTemperature")));
vehicle.setTotalBatteryCurrent(new BigDecimal(map.get("totalBatteryCurrent")));
vehicle.setSingleBatteryMaxVoltage(new BigDecimal(map.get("singleBatteryMaxVoltage")));
vehicle.setOperatingStatus(new BigDecimal(map.get("operatingStatus")).intValue());
vehicle.setHeatingStatus(new BigDecimal(map.get("heatingStatus")).intValue());
vehicle.setDcdcStatus(new BigDecimal(map.get("dcdcStatus")).intValue());
vehicle.setDriveMotorStatus(new BigDecimal(map.get("driveMotorStatus")).intValue());
vehicle.setPositionStatus(new BigDecimal(map.get("positionStatus")).intValue());
vehicle.setPtcStatus(new BigDecimal(map.get("ptcStatus")).intValue());
vehicle.setEpsStatus(new BigDecimal(map.get("epsStatus")).intValue());
vehicle.setAbsStatus(new BigDecimal(map.get("absStatus")).intValue());
vehicle.setMcuStatus(new BigDecimal(map.get("mcuStatus")).intValue());
vehicle.setBatteryInsulationStatus(new BigDecimal(map.get("batteryInsulationStatus")).intValue());
vehicle.setBatteryStatus(new BigDecimal(map.get("batteryStatus")).intValue());
vehicle.setChgStatus(new BigDecimal(map.get("chgStatus")).intValue());
vehicle.setTotalBatteryVoltage(new BigDecimal(map.get("totalBatteryVoltage")));
vehicle.setMotorSpeed(new BigDecimal(map.get("motorSpeed")));
vehicle.setMotorCurrent(new BigDecimal(map.get("motorCurrent")));
vehicle.setMotorVoltage(new BigDecimal(map.get("motorVoltage")));
vehicle.setAccelerationPedal(new BigDecimal(map.get("accelerationPedal")));
vehicle.setBrakePedal(new BigDecimal(map.get("brakePedal")));
vehicle.setSelfCheckCounter(new BigDecimal(map.get("selfCheckCounter")));
vehicle.setMotorTemperature(new BigDecimal(map.get("motorTemperature")));
vehicle.setMaximumDischargePower(new BigDecimal(map.get("maximumDischargePower")));
vehicle.setMaximumFeedbackPower(new BigDecimal(map.get("maximumFeedbackPower")));
vehicle.setFuelConsumptionRate(new BigDecimal(map.get("fuelConsumptionRate")));
vehicle.setVin(req.getVin());
vehicle.setVoltage(new BigDecimal(map.get("voltage")));
vehicle.setCurrent(new BigDecimal(map.get("current")));
vehicle.setResistance(new BigDecimal(map.get("resistance")));
vehicle.setMotoTorque(new BigDecimal(map.get("motoTorque")));
vehicle.setRemainingBattery(new BigDecimal(map.get("remainingBattery")));
vehicle.setSingleBatteryMinVoltage(new BigDecimal(map.get("singleBatteryMinVoltage")));
vehicle.setSingleBatteryMaxTemperature(new BigDecimal(map.get("singleBatteryMaxTemperature")));
vehicle.setSingleBatteryMinTemperature(new BigDecimal(map.get("singleBatteryMinTemperature")));
vehicle.setAvailableBatteryCapacity(new BigDecimal(map.get("availableBatteryCapacity")));
iotDbResultList.add(vehicle);
mapList.add(map);
}
}
}

View File

@ -24,6 +24,7 @@ import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* @description: kafka
@ -59,10 +60,11 @@ public class KafkaConsumerListenerExample {
@KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics")
public void consume(ConsumerRecord<String, String> record) {
log.info("消费信息为:{}",record);
Vehicle vehicle = getVehicle(record.value());
// 无数据接口存map集合
Map<String,String> map = JSON.parseObject(record.value(),Map.class);
// 存入iotDB
try {
iotDbServer.insertData(vehicle);
iotDbServer.insertData(map);
log.info("添加成功");
} catch (StatementExecutionException e) {

View File

@ -91,11 +91,11 @@ public class VinConsumerRunner implements ApplicationRunner {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
// 报文解析为对象
// Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value());
// 获取数据并存入
Map<String,String> map = JSON.parseObject(record.value(), Map.class);
// 获取对应的事件
VehicleEvent events = eventsService.getEvents(map.get("vin"));
log.info("执行事件:{}",events);
HandlerHelper.doHandler(events, map, redisService);
}
}

View File

@ -1,6 +1,5 @@
package com.mobai.vehicle;
import com.mobai.domain.Vehicle;
import com.mobai.domain.VehicleEvent;
import com.mobai.utils.RedisService;
import com.mobai.vehicle.event.constants.EventHandler;

View File

@ -57,4 +57,23 @@ public class CaffeineCache {
.build();
}
/**
* 10
* @return cache
*/
@Bean(name = "createTenMinuteCaffeine")
public Cache createTenMinuteCaffeine() {
return Caffeine.newBuilder()
// 设置初始连接数
.initialCapacity(1000)
//半小时没有读自动删除 访问
.expireAfterAccess(10, TimeUnit.MINUTES)
//最大容量1024个超过会自动清理空间
.maximumSize(1024)
.removalListener(((key, value, cause) -> {
//清理通知 key,value ==> 键值对 cause ==> 清理原因
}))
.build();
}
}

View File

@ -2,10 +2,7 @@ package com.mobai.vehicle.event.constants;
import cn.hutool.extra.spring.SpringUtil;
import com.mobai.vehicle.EventActive;
import com.mobai.vehicle.event.factory.BugMalfunctionFactory;
import com.mobai.vehicle.event.factory.ElectronicFenceFactory;
import com.mobai.vehicle.event.factory.IndexWaringFactory;
import com.mobai.vehicle.event.factory.RealTimeFactory;
import com.mobai.vehicle.event.factory.*;
import lombok.extern.log4j.Log4j2;
import java.util.Arrays;
@ -25,6 +22,7 @@ public enum EventHandler {
REAL_TIME_DATA(RealTimeFactory.class),
INDEX_WARNING(IndexWaringFactory.class),
BUG_MALFUNCTION(BugMalfunctionFactory.class),
// STANDARD_WARNING(StandardWarningFactory.class),
ELECTRONIC_FENCE(ElectronicFenceFactory.class);
private String code;

View File

@ -98,47 +98,46 @@ public class BugMalfunctionFactory implements EventActive {
result.setFaultCode(FaultCode.POSITION.getString());
this.active(result);
}
if (vehicle.get("EasStatus").equals(0+"")) {
if (vehicle.get("easStatus").equals(0+"")) {
result.setFaultCode(FaultCode.EAS.getString());
this.active(result);
}
if (vehicle.get("PtcStatus").equals(0+"")) {
if (vehicle.get("ptcStatus").equals(0+"")) {
result.setFaultCode(FaultCode.PTC.getString());
this.active(result);
}
if (vehicle.get("EpsStatus").equals(0+"")) {
if (vehicle.get("epsStatus").equals(0+"")) {
result.setFaultCode(FaultCode.EPS.getString());
this.active(result);
}
if (vehicle.get("AbsStatus").equals(0+"")) {
if (vehicle.get("absStatus").equals(0+"")) {
result.setFaultCode(FaultCode.ABS.getString());
this.active(result);
}
if (vehicle.get("McuStatus").equals(0+"")) {
if (vehicle.get("mcuStatus").equals(0+"")) {
result.setFaultCode(FaultCode.MCU.getString());
this.active(result);
}
if (vehicle.get("HeatingStatus").equals(0+"")) {
if (vehicle.get("heatingStatus").equals(0+"")) {
result.setFaultCode(FaultCode.HEATING.getString());
this.active(result);
}
if (vehicle.get("BatteryStatus").equals(0+"")) {
if (vehicle.get("batteryStatus").equals(0+"")) {
result.setFaultCode(FaultCode.BATTERY.getString());
this.active(result);
}
if (vehicle.get("BatteryInsulationStatus").equals(0+"")) {
if (vehicle.get("batteryInsulationStatus").equals(0+"")) {
result.setFaultCode(FaultCode.BATTERY_INSULATION.getString());
this.active(result);
}
if (vehicle.get("DcdcStatus").equals(0+"")) {
if (vehicle.get("dcdcStatus").equals(0+"")) {
result.setFaultCode(FaultCode.DCDC.getString());
this.active(result);
}
if (vehicle.get("ChgStatus").equals(0+"")) {
if (vehicle.get("chgStatus").equals(0+"")) {
result.setFaultCode(FaultCode.CHG.getString());
this.active(result);
}
}

View File

@ -1,6 +1,5 @@
package com.mobai.vehicle.event.factory;
import com.mobai.domain.Vehicle;
import com.mobai.utils.RedisService;
import com.mobai.vehicle.EventActive;
import lombok.extern.log4j.Log4j2;
@ -10,7 +9,7 @@ import org.springframework.stereotype.Component;
import java.util.Map;
/**
*
*
* @author Saisai
* @className BugMalfunctionFactory
* @description

View File

@ -0,0 +1,279 @@
package com.mobai.vehicle.event.factory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache;
import com.mobai.iotDB.service.impl.IotDbServerImpl;
import com.mobai.req.VehicleReq;
import com.mobai.utils.RedisService;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
/**
*
*
* @author Saisai
* @className BugMalfunctionFactory
* @description
* @date 2024/6/20 14:42
*/
@Data
@Log4j2
@Component
public class StandardWarningFactory {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* keycode-level-times
*/
@Resource(name = "createTenMinuteCaffeine")
private Cache<String, String> cache;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IotDbServerImpl iotDbServerImpl;
private Map<String, ScheduledExecutorService> scheduleMap;
/**
* keycode-cycleLength-level
*/
private ConcurrentHashMap<String, List<String>> vinLevel;
private List<String> vins;
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "vehicle-prop", type = ExchangeTypes.FANOUT)
))
public void changeProp(String vin) {
List<String> attrs = new ArrayList<>();
List<Integer> attrLength = new ArrayList<>();
List<String> propList = redisTemplate.opsForList().range(vin + "List", 0, -1);
for (String string : propList) {
JSONObject prop = JSON.parseObject(string);
// 指标属性
attrs.add(String.valueOf(prop.get("keyCode")));
// 周期延迟时长
attrLength.add((Integer) prop.get("times"));
attrs.add(prop.get("keyCode") + "-" + prop.get("level") + "-" + prop.get("times"));
}
cache.put(vin + "-keyCode", JSON.toJSONString(attrs)); // vin keycode-level-times
log.info("更新缓存:{}", vin);
// 指标未编写和存入
}
/**
* 线
*
* @param vin
*/
@RabbitListener(queues = "standard-Warn-Event-Start")
public void joinStandardVehicle(String vin) {
List<String> cacheProps = new ArrayList<>();
log.info("车辆上线:{}", vin);
List<String> props = redisTemplate.opsForList().range(vin + "Prop", 0, -1);
props.forEach(prop -> {
JSONObject jsonObject = JSON.parseObject(prop);
String keyCode = String.valueOf(jsonObject.get("keyCode"));
String level = String.valueOf(jsonObject.get("level"));
String windowLength = String.valueOf(jsonObject.get("windowLength"));
String cycleLength = String.valueOf(jsonObject.get("activeLength"));
// cycle = windowLEngth * cycle
cacheProps.add(keyCode + "-" + cycleLength + "-" + level);
});
// 存入车辆指标缓存 code-cycleLength-level
cache.put(vin + "-keyCode", JSON.toJSONString(props));
vins.add(vin);
List<String> vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class);
if (vinLevel == null) {
changeProp(vin);
vinLevel = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class); // keycode-level-times
}
Map<String, String> map = new HashMap<>();
vinLevel.stream().map(str -> (map.put(str.split("-")[0], str.split("-")[1]))); // key : attr value : level
String[] split = vinLevel.get(0).split("-"); // vin-level-times
String times = split[2];
try {
List<Map<String, String>> vehicleList = iotDbServerImpl.queryDataFromIotDb(
new VehicleReq() {{
setVin(vin);
setStartTime(new Date().getTime() - Long.parseLong(times) * 1000);
setEndTime(new Date().getTime());
}});
List<String> finalVinLevel = vinLevel;
HashMap<String, List<Integer>> result = new HashMap<>();
// 数据查询结果处理
vehicleList.forEach(vehicleMap -> {
for (String vinInfo : finalVinLevel) {
String[] vinInfoSplit = vinInfo.split("-");
// 数据库查询结果
String string = vehicleMap.get(vinInfoSplit[0]);
int resu = Integer.parseInt(vinInfoSplit[1]) - Integer.parseInt(string);
if (!result.containsKey(string)) {
result.put(string, new ArrayList<Integer>());
}
result.get(string).add(resu);
// 指标异常处理
if (resu > 10 || resu < -10) {
log.info("指标有问题预警发生ing");
// rabbitTemplate.convertAndSend("standardWarning-error", vinInfoSplit[0] + ":" + string);
}
}
});
log.info("发送了结果");
// rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 线
*
* @param vin
*/
@RabbitListener(queues = "standard-Warn-Event-End")
public void outStandardVehicle(String vin) {
log.info("车辆下线:{}", vin);
// scheduleMap.get(vin + "-schedule").shutdown();
}
// 周期线程池
public void activeEvent(Map<String, String> vehicle, RedisService redisService) {
// 创建初始的周期性线程池
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
// 提交一些任务给线程池执行
for (String vin : vins) {// keycode-cycleLength-times
List<String> strings = JSON.parseArray(cache.getIfPresent(vin + "-keyCode"), String.class);
if (strings.isEmpty()){
changeProp(vin);
}
executor.submit(() -> {
for (String string : strings) {
// keycode-cycleLength-level
String[] split = string.split("-");
try {
List<String> list = new ArrayList<>();
iotDbServerImpl.queryDataFromIotDb(new VehicleReq() {{
setVin(vin);
setCode(split[0]);
setEndTime(new Date().getTime());
setStartTime(new Date().getTime() - Long.parseLong(split[1]) * 1000);
}}).stream().map(map -> list.addAll(map.values()));
List<Integer> collect = list.stream().map(Integer::valueOf).toList();
int avg = (int) collect.stream().mapToInt(number -> number).average().getAsDouble();
List<Integer> result = new ArrayList<>();
for (Integer i : collect) {
result.add(i - avg);
if (i>Integer.parseInt(split[2])){
rabbitTemplate.convertAndSend("standardWarning-error", split[0] + ":" + i);
}
}
rabbitTemplate.convertAndSend("standardWarning-result", JSON.toJSONString(result));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
// 提交线程池
// 只用延迟线程
// executor.scheduleWithFixedDelay();
}
/**
* 线
*/
private static void way2() {
ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); // 参数--核线程个数
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println(Thread.currentThread().getName() + " → " + " Start Time = " + new Date());
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " → " + " End Time = " + new Date());
}, 1, 4, TimeUnit.SECONDS); // 延迟 1s周期 4s
}
// public void sche(String[] args) {
// ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//
// // 初始延迟和周期
// long initialDelay = 0;
// long period = 1000; // 初始周期为1秒
//
// // 提交一个周期性任务
// ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
// // 模拟任务执行逻辑
// System.out.println("Task execution...");
//
// // 模拟根据条件动态调整执行周期或取消任务
// if (someCondition()) {
// // 根据条件取消当前任务
// future.cancel(false);
// System.out.println("Task cancelled due to condition.");
//
// // 根据条件重新安排一个新任务
// long newDelay = 2000; // 新的延迟为2秒
// long newPeriod = 3000; // 新的周期为3秒
// future = executor.scheduleAtFixedRate(() -> {
// System.out.println("New task execution...");
// }, newDelay, newPeriod, TimeUnit.MILLISECONDS);
// }
// }, initialDelay, period, TimeUnit.MILLISECONDS);
//
// // 等待一段时间后关闭 executor
// try {
// Thread.sleep(15000); // 等待15秒钟演示多次执行和动态调整
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// executor.shutdown();
// }
//
// // 示例条件方法,根据具体情况自行替换
// private static boolean someCondition() {
// // 这里可以根据具体逻辑返回是否满足条件
// return Math.random() < 0.1; // 模拟10%的概率满足条件
// }
}

View File

@ -15,16 +15,16 @@ import org.springframework.stereotype.Component;
public class StandardEvent {
@RabbitListener(queues = {"standard_Warn_Event_Start"})
public void eventStart(String msg, Message message, AMQP.Channel channel) {
}
@RabbitListener(queues = {"standard_Warn_Event_End"})
public void eventEnd(String msg, Message message, AMQP.Channel channel) {
}
// @RabbitListener(queues = {"standard_Warn_Event_Start"})
// public void eventStart(String msg, Message message, AMQP.Channel channel) {
//
// }
//
//
// @RabbitListener(queues = {"standard_Warn_Event_End"})
// public void eventEnd(String msg, Message message, AMQP.Channel channel) {
//
// }
}

View File

@ -4,7 +4,6 @@ kafka:
topic: vehicle-event-topic0
partition: 0
spring:
redis:
host: 175.24.138.82
rabbitmq: