feat: 新增根据车辆解析配置,执行对应事件
parent
1000bdbdd0
commit
b44c040fad
4
pom.xml
4
pom.xml
|
@ -131,8 +131,8 @@
|
|||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.1</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<source>8</source>
|
||||
<target>8</target>
|
||||
<encoding>UTF-8</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -25,8 +25,18 @@ public class EventInfoController {
|
|||
eventInfoService.creatKafkaConsumer(vin);
|
||||
}
|
||||
|
||||
@GetMapping("/CreatKafkaListen")
|
||||
public void creatKafkaListen(@RequestParam("vehicleVin") String vehicleVin) {
|
||||
eventInfoService.creatKafkaListen(vehicleVin);
|
||||
}
|
||||
|
||||
@GetMapping("/CloseKafkaConsumer")
|
||||
public Result closeKafkaConsumer(@RequestParam("vin") String vin) {
|
||||
return eventInfoService.closeKafkaConsumer(vin);
|
||||
}
|
||||
|
||||
@GetMapping("/SlidingWindow")
|
||||
public void slidingWindow() {
|
||||
eventInfoService.slidingWindow();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package com.muyu.eventdriven.controller;
|
||||
|
||||
import com.muyu.eventdriven.config.iotdb.IotDBSessionConfig;
|
||||
import com.muyu.eventdriven.model.param.IotDbParam;
|
||||
import com.muyu.eventdriven.model.param.VehicleParam;
|
||||
import com.muyu.eventdriven.response.ResponseData;
|
||||
import com.muyu.eventdriven.server.IotDbServer;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -10,9 +10,11 @@ import org.apache.iotdb.rpc.StatementExecutionException;
|
|||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.rmi.ServerException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* @ClassName AsVehicleEvent
|
||||
|
@ -31,21 +33,26 @@ public class IotDbController {
|
|||
|
||||
/**
|
||||
* 插入数据
|
||||
* @param iotDbParam
|
||||
* @param vehicleParam
|
||||
*/
|
||||
@PostMapping("/api/device/insert")
|
||||
public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
|
||||
// iotDbServer.insertData(iotDbParam);
|
||||
public ResponseData insert(@RequestBody VehicleParam vehicleParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
|
||||
String str1 ="{\"vin\":\"WZHFHDAIYUVKSHH3G\",\"drivingRoute\":\"1719454864664\",\"longitude\":\"116.678005\",\"latitude\":\"39.547881\",\"mileage\":\"9287722.9\"}";
|
||||
String str2 ="{\"vin\":\"WZHFHDAIYUVKSHH3G\",\"drivingRoute\":\"1719454862674\",\"longitude\":\"116.675176\",\"latitude\":\"39.547643\",\"mileage\":\"81565971.4\"}";
|
||||
ArrayList<JSONObject> jsonObjects = new ArrayList<>();
|
||||
jsonObjects.add(JSONObject.parseObject(str1));
|
||||
jsonObjects.add(JSONObject.parseObject(str2));
|
||||
iotDbServer.insertDatas("WZHFHDAIYUVKSHH3G",jsonObjects);
|
||||
return ResponseData.success();
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询数据
|
||||
* @param iotDbParam
|
||||
* @param vehicleParam
|
||||
*/
|
||||
@PostMapping("/api/device/queryData")
|
||||
public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
|
||||
return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));
|
||||
public ResponseData queryDataFromIotDb(@RequestBody VehicleParam vehicleParam) throws Exception {
|
||||
return ResponseData.success(iotDbServer.queryDataFromIotDb(vehicleParam));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package com.muyu.eventdriven.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @ClassName SlidingWindowInfo
|
||||
* @Description 滑窗配置实体类
|
||||
* @Author Xin.Yao
|
||||
* @Date 2024/6/26 上午9:09
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@SuperBuilder
|
||||
public class SlidingWindowInfo {
|
||||
/**
|
||||
* 车辆vin
|
||||
*/
|
||||
private String vin;
|
||||
/**
|
||||
* 滑窗长度
|
||||
*/
|
||||
private String slidingWindowSize;
|
||||
/**
|
||||
* 滑动长度
|
||||
*/
|
||||
private String slideSize;
|
||||
/**
|
||||
* 数据key集合
|
||||
*/
|
||||
private List<String> dataKeys;
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package com.muyu.eventdriven.listener;
|
||||
|
||||
import com.muyu.eventdriven.listener.event.SlidingWindowEvent;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @ClassName SlidingWindowListener
|
||||
* @Description 滑窗配置监听
|
||||
* @Author Xin.Yao
|
||||
* @Date 2024/6/26 上午10:41
|
||||
*/
|
||||
@Component
|
||||
@Log4j2
|
||||
public class SlidingWindowListener {
|
||||
@Async
|
||||
@EventListener(SlidingWindowEvent.class)
|
||||
public void slidingWindowListener(SlidingWindowEvent slidingWindowEvent) {
|
||||
log.info("执行了监听事件,数据为:{}", slidingWindowEvent.getSource());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.muyu.eventdriven.listener.event;
|
||||
|
||||
import com.muyu.eventdriven.domain.SlidingWindowInfo;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
* @ClassName SlidingWindowEvent
|
||||
* @Description 滑窗配置实体类事件
|
||||
* @Author Xin.Yao
|
||||
* @Date 2024/6/26 上午9:13
|
||||
*/
|
||||
public class SlidingWindowEvent extends ApplicationEvent {
|
||||
public SlidingWindowEvent(SlidingWindowInfo slidingWindowInfo) {
|
||||
super(slidingWindowInfo);
|
||||
}
|
||||
}
|
|
@ -8,27 +8,11 @@ import lombok.Data;
|
|||
* @Date 2024/6/16 下午3:29
|
||||
*/
|
||||
@Data
|
||||
public class IotDbParam {
|
||||
public class VehicleParam {
|
||||
/***
|
||||
* 产品PK
|
||||
* 车辆vin
|
||||
*/
|
||||
private String pk;
|
||||
/***
|
||||
* 设备号
|
||||
*/
|
||||
private String sn;
|
||||
/***
|
||||
* 时间
|
||||
*/
|
||||
private Long time;
|
||||
/***
|
||||
* 实时呼吸
|
||||
*/
|
||||
private String breath;
|
||||
/***
|
||||
* 实时心率
|
||||
*/
|
||||
private String heart;
|
||||
private String vin;
|
||||
/***
|
||||
* 查询开始时间
|
||||
*/
|
|
@ -9,26 +9,18 @@ import lombok.Data;
|
|||
* @Date 2024/6/16 下午3:29
|
||||
*/
|
||||
@Data
|
||||
public class IotDbResult {
|
||||
public class VehicleResult {
|
||||
/***
|
||||
* 时间
|
||||
*/
|
||||
private String time;
|
||||
/***
|
||||
* 产品PK
|
||||
* 车辆vin
|
||||
*/
|
||||
private String pk;
|
||||
private String vin;
|
||||
/***
|
||||
* 设备号
|
||||
* 实时数据
|
||||
*/
|
||||
private String sn;
|
||||
/***
|
||||
* 实时呼吸
|
||||
*/
|
||||
private String breath;
|
||||
/***
|
||||
* 实时心率
|
||||
*/
|
||||
private String heart;
|
||||
private String data;
|
||||
|
||||
}
|
|
@ -12,4 +12,8 @@ public interface EventInfoService {
|
|||
void creatKafkaConsumer(String vin);
|
||||
|
||||
Result closeKafkaConsumer(String vin);
|
||||
|
||||
void creatKafkaListen(String vehicleVin);
|
||||
|
||||
void slidingWindow();
|
||||
}
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
package com.muyu.eventdriven.server;
|
||||
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
import com.muyu.eventdriven.model.param.IotDbParam;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.model.param.VehicleParam;
|
||||
|
||||
import java.rmi.ServerException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @ClassName AsVehicleEvent
|
||||
|
@ -18,10 +16,11 @@ public interface IotDbServer {
|
|||
/**
|
||||
* 添加数据
|
||||
*/
|
||||
void insertData(VehicleData vehicleData);
|
||||
void insertData(JSONObject jsonObject);
|
||||
void insertDatas(String vin, List<JSONObject> jsonObjects);
|
||||
|
||||
/**
|
||||
* 查询数据
|
||||
*/
|
||||
Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
|
||||
Object queryDataFromIotDb(VehicleParam vehicleParam) throws Exception;
|
||||
}
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package com.muyu.eventdriven.server.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.consumer.KafkaConsumers;
|
||||
import com.muyu.eventdriven.domain.EventTacticsManage;
|
||||
import com.muyu.eventdriven.domain.SlidingWindowInfo;
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
import com.muyu.eventdriven.domain.VehicleKafka;
|
||||
import com.muyu.eventdriven.domain.rest.Result;
|
||||
import com.muyu.eventdriven.listener.event.SlidingWindowEvent;
|
||||
import com.muyu.eventdriven.server.EventInfoService;
|
||||
import com.muyu.eventdriven.tactics.EventTactics;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -14,6 +17,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
|
@ -49,6 +53,8 @@ public class EventInfoServiceImpl implements EventInfoService {
|
|||
private EventTacticsManage eventTacticsManage;
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
@Autowired
|
||||
private ApplicationEventPublisher applicationEventPublisher;
|
||||
|
||||
@Override
|
||||
public void creatKafkaConsumer(String vehicleVin) {
|
||||
|
@ -61,7 +67,7 @@ public class EventInfoServiceImpl implements EventInfoService {
|
|||
kafkaConsumerMap.put(vehicleKafka.getConsumerName()+"-"+vehicleKafka.getPartitions(),kafkaConsumer);
|
||||
while (kafkaConsumerMap.containsKey(vehicleKafka.getConsumerName()+"-"+vehicleKafka.getPartitions())){
|
||||
// 使用 ConcurrentHashMap 来保证线程安全
|
||||
ConcurrentHashMap<String, ArrayList<VehicleData>> stringListHashMap = new ConcurrentHashMap<>();
|
||||
ConcurrentHashMap<String, ArrayList<JSONObject>> stringListHashMap = new ConcurrentHashMap<>();
|
||||
// 拉取消息
|
||||
ConsumerRecords<String, String> msg = kafkaConsumer.poll(Duration.ofSeconds(1));
|
||||
stringListHashMap = getVehicleData(msg, stringListHashMap);
|
||||
|
@ -78,16 +84,16 @@ public class EventInfoServiceImpl implements EventInfoService {
|
|||
}
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<String, ArrayList<VehicleData>> getVehicleData(ConsumerRecords<String, String> msg,ConcurrentHashMap<String, ArrayList<VehicleData>> stringListHashMap) {
|
||||
public ConcurrentHashMap<String, ArrayList<JSONObject>> getVehicleData(ConsumerRecords<String, String> msg,ConcurrentHashMap<String, ArrayList<JSONObject>> stringListHashMap) {
|
||||
for (ConsumerRecord<String, String> consumerRecord : msg) {
|
||||
try {
|
||||
VehicleData vehicleData = JSON.parseObject(consumerRecord.value(), VehicleData.class);
|
||||
JSONObject jsonObject = JSONObject.parseObject(consumerRecord.value());
|
||||
// 使用 compute 方法简化数据添加逻辑
|
||||
stringListHashMap.compute(vehicleData.getVin(), (vin, list) -> {
|
||||
stringListHashMap.compute(jsonObject.getString("vin"), (vin, list) -> {
|
||||
if (list == null) {
|
||||
list = new ArrayList<>();
|
||||
}
|
||||
list.add(vehicleData);
|
||||
list.add(jsonObject);
|
||||
return list;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
|
@ -102,4 +108,40 @@ public class EventInfoServiceImpl implements EventInfoService {
|
|||
redisTemplate.delete(vin);
|
||||
return Result.success("释放消费者");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void creatKafkaListen(String vehicleVin) {
|
||||
Object o = redisTemplate.opsForHash().get("vehicleKafka", vehicleVin);
|
||||
VehicleKafka vehicleKafka = JSON.parseObject(o.toString(), VehicleKafka.class);
|
||||
KafkaConsumer kafkaConsumer = kafkaConsumers.kafkaConsumer(vehicleKafka);
|
||||
kafkaConsumerMap.put(vehicleKafka.getConsumerName()+"-"+vehicleKafka.getPartitions(),kafkaConsumer);
|
||||
while (kafkaConsumerMap.containsKey(vehicleKafka.getConsumerName()+"-"+vehicleKafka.getPartitions())){
|
||||
// 使用 ConcurrentHashMap 来保证线程安全
|
||||
ConcurrentHashMap<String, ArrayList<JSONObject>> stringListHashMap = new ConcurrentHashMap<>();
|
||||
// 拉取消息
|
||||
ConsumerRecords<String, String> msg = kafkaConsumer.poll(Duration.ofSeconds(1));
|
||||
stringListHashMap = getVehicleData(msg, stringListHashMap);
|
||||
stringListHashMap.forEach((key,value) -> {
|
||||
// String vehicleEventString = redisTemplate.opsForHash().get(RedisConstants.VEHICLE_EVENT, key).toString();
|
||||
String vehicleEventString = "1,2,3,4,5";
|
||||
for (String str : vehicleEventString.split(",")) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
applicationContext.getBean(classNameList.get(Integer.parseInt(str)), EventTactics.class).eventManage(key,value);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void slidingWindow() {
|
||||
List<String> strings = new ArrayList<>();
|
||||
strings.add("a");
|
||||
strings.add("b");
|
||||
strings.add("c");
|
||||
strings.add("d");
|
||||
SlidingWindowEvent aaaa = new SlidingWindowEvent(new SlidingWindowInfo("AAAA", "20", "10", strings));
|
||||
log.info("执行了指标预警事件");
|
||||
applicationEventPublisher.publishEvent(aaaa);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
package com.muyu.eventdriven.server.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.config.iotdb.IotDBSessionConfig;
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
import com.muyu.eventdriven.model.param.IotDbParam;
|
||||
import com.muyu.eventdriven.model.result.IotDbResult;
|
||||
import com.muyu.eventdriven.model.param.VehicleParam;
|
||||
import com.muyu.eventdriven.model.result.VehicleResult;
|
||||
import com.muyu.eventdriven.server.IotDbServer;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
|
@ -32,29 +31,62 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
private IotDBSessionConfig iotDBSessionConfig;
|
||||
|
||||
@Override
|
||||
public void insertData(VehicleData vehicleData){
|
||||
public void insertData(JSONObject jsonObject){
|
||||
|
||||
try {
|
||||
// iotDbParam: 模拟设备上报消息
|
||||
String deviceId = "root.vehicle."+ vehicleData.getVin();
|
||||
String deviceId = "root.vehicle."+ jsonObject.getString("vin");
|
||||
// 将设备上报的数据存入数据库(时序数据库)
|
||||
List<String> measurementsList = new ArrayList<>();
|
||||
measurementsList.add("data");
|
||||
List<String> valuesList = new ArrayList<>();
|
||||
valuesList.add(String.valueOf(JSON.toJSON(vehicleData)));
|
||||
iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(vehicleData.getDrivingRoute()), measurementsList, valuesList);
|
||||
valuesList.add(jsonObject.toString());
|
||||
iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(jsonObject.getString("drivingRoute")), measurementsList, valuesList);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
|
||||
List<IotDbResult> iotDbResultList = new ArrayList<>();
|
||||
public void insertDatas(String vin,List<JSONObject> jsonObjects) {
|
||||
try {
|
||||
|
||||
if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
|
||||
String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "
|
||||
+ iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
|
||||
List<String> deviceIds =new ArrayList<>();
|
||||
List<Long> timeList =new ArrayList<>();
|
||||
List<List<String>> valusList =new ArrayList<>();
|
||||
List<List<String>> measurementsList = new ArrayList<>();
|
||||
jsonObjects.stream().forEach(jsonObject -> {
|
||||
deviceIds.add("root.vehicle."+ jsonObject.getString("vin"));
|
||||
timeList.add(Long.valueOf(jsonObject.getString("drivingRoute")));
|
||||
valusList.add(Arrays.asList(jsonObject.toString()));
|
||||
measurementsList.add(Arrays.asList("data"));
|
||||
});
|
||||
iotDBSessionConfig.insertRecords(deviceIds,timeList,measurementsList,valusList);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VehicleResult> queryDataFromIotDb(VehicleParam vehicleParam) throws Exception {
|
||||
List<VehicleResult> vehicleResultList = new ArrayList<>();
|
||||
|
||||
if (null != vehicleParam.getVin()) {
|
||||
String sql = "select * from root.vehicle."+ vehicleParam.getVin();
|
||||
if (null != vehicleParam.getStartTime() && !"".equals(vehicleParam.getStartTime())){
|
||||
if (sql.contains("where")){
|
||||
sql += " and time >= " + vehicleParam.getStartTime();
|
||||
}else{
|
||||
sql += " where time >= " + vehicleParam.getStartTime();
|
||||
}
|
||||
}
|
||||
if (null != vehicleParam.getEndTime() && !"".equals(vehicleParam.getEndTime())){
|
||||
if (sql.contains("where")){
|
||||
sql += " and time <= " + vehicleParam.getEndTime();
|
||||
}else{
|
||||
sql += " where time <= " + vehicleParam.getEndTime();
|
||||
}
|
||||
}
|
||||
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
List<String> titleList = new ArrayList<>();
|
||||
|
@ -64,31 +96,31 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
titleList.add(temp[temp.length - 1]);
|
||||
}
|
||||
// 封装处理数据
|
||||
packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
|
||||
packagingData(vehicleParam, vehicleResultList, sessionDataSet, titleList);
|
||||
} else {
|
||||
log.info("PK或者SN不能为空!!");
|
||||
}
|
||||
return iotDbResultList;
|
||||
return vehicleResultList;
|
||||
}
|
||||
/**
|
||||
* 封装处理数据
|
||||
* @param iotDbParam
|
||||
* @param iotDbResultList
|
||||
* @param vehicleParam
|
||||
* @param vehicleResultList
|
||||
* @param sessionDataSet
|
||||
* @param titleList
|
||||
* @throws StatementExecutionException
|
||||
* @throws IoTDBConnectionException
|
||||
*/
|
||||
private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
|
||||
private void packagingData(VehicleParam vehicleParam, List<VehicleResult> vehicleResultList, SessionDataSet sessionDataSet, List<String> titleList)
|
||||
throws StatementExecutionException, IoTDBConnectionException {
|
||||
int fetchSize = sessionDataSet.getFetchSize();
|
||||
if (fetchSize > 0) {
|
||||
while (sessionDataSet.hasNext()) {
|
||||
IotDbResult iotDbResult = new IotDbResult();
|
||||
VehicleResult vehicleResult = new VehicleResult();
|
||||
RowRecord next = sessionDataSet.next();
|
||||
List<Field> fields = next.getFields();
|
||||
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
|
||||
iotDbResult.setTime(timeString);
|
||||
vehicleResult.setTime(timeString);
|
||||
Map<String, String> map = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
|
@ -96,12 +128,10 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
// 这里的需要按照类型获取
|
||||
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
|
||||
}
|
||||
iotDbResult.setTime(timeString);
|
||||
iotDbResult.setPk(iotDbParam.getPk());
|
||||
iotDbResult.setSn(iotDbParam.getSn());
|
||||
iotDbResult.setHeart(map.get("heart"));
|
||||
iotDbResult.setBreath(map.get("breath"));
|
||||
iotDbResultList.add(iotDbResult);
|
||||
vehicleResult.setTime(timeString);
|
||||
vehicleResult.setVin(vehicleParam.getVin());
|
||||
vehicleResult.setData(map.get("data"));
|
||||
vehicleResultList.add(vehicleResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.muyu.eventdriven.tactics;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -11,6 +12,6 @@ import java.util.List;
|
|||
* @Date 2024/6/20 上午10:33
|
||||
*/
|
||||
public interface EventTactics {
|
||||
void eventManage(VehicleData vehicleData);
|
||||
void eventManage(String vin,List<VehicleData> vehicleDataList);
|
||||
void eventManage(JSONObject jsonObject);
|
||||
void eventManage(String vin,List<JSONObject> jsonObjects);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.muyu.eventdriven.tactics.basics;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
import com.muyu.eventdriven.server.IotDbServer;
|
||||
import com.muyu.eventdriven.tactics.EventTactics;
|
||||
|
@ -26,24 +27,21 @@ public class StorageEvent implements EventTactics {
|
|||
|
||||
/**
|
||||
* 单条数据的处理
|
||||
* @param vehicleData
|
||||
* @param jsonObject
|
||||
*/
|
||||
@Override
|
||||
public void eventManage(VehicleData vehicleData) {
|
||||
iotDbServer.insertData(vehicleData);
|
||||
log.info("车辆{}执行存储事件",vehicleData.getVin());
|
||||
public void eventManage(JSONObject jsonObject) {
|
||||
log.info("车辆{}执行存储事件",jsonObject.getString("vin"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 多条数据的处理
|
||||
* @param vin
|
||||
* @param vehicleDataList
|
||||
* @param jsonObjects
|
||||
*/
|
||||
@Override
|
||||
public void eventManage(String vin, List<VehicleData> vehicleDataList) {
|
||||
public void eventManage(String vin, List<JSONObject> jsonObjects) {
|
||||
log.info("车辆{}执行存储事件",vin);
|
||||
vehicleDataList.forEach(vehicleData -> {
|
||||
iotDbServer.insertData(vehicleData);
|
||||
});
|
||||
iotDbServer.insertDatas(vin,jsonObjects);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.muyu.eventdriven.tactics.system;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
import com.muyu.eventdriven.tactics.EventTactics;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -18,20 +19,20 @@ import java.util.List;
|
|||
public class ElectronicFenceEvent implements EventTactics {
|
||||
/**
|
||||
* 单条数据的处理
|
||||
* @param vehicleData
|
||||
* @param jsonObject
|
||||
*/
|
||||
@Override
|
||||
public void eventManage(VehicleData vehicleData) {
|
||||
log.info("车辆{}执行电子围栏事件",vehicleData.getVin());
|
||||
public void eventManage(JSONObject jsonObject) {
|
||||
log.info("车辆{}执行电子围栏事件",jsonObject.getString("vin"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 多条数据的处理
|
||||
* @param vin
|
||||
* @param vehicleDataList
|
||||
* @param jsonObjects
|
||||
*/
|
||||
@Override
|
||||
public void eventManage(String vin, List<VehicleData> vehicleDataList) {
|
||||
public void eventManage(String vin, List<JSONObject> jsonObjects) {
|
||||
log.info("车辆{}执行电子围栏事件",vin);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.muyu.eventdriven.tactics.system;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.muyu.eventdriven.constants.FaultCodeConstants;
|
||||
import com.muyu.eventdriven.constants.RabbitConstants;
|
||||
|
@ -42,83 +43,83 @@ public class FaultAlarmEvent implements EventTactics {
|
|||
|
||||
|
||||
@Override
|
||||
public void eventManage(VehicleData vehicleData) {
|
||||
log.info("车辆{}执行故障报警事件",vehicleData.getVin());
|
||||
public void eventManage(JSONObject jsonObject) {
|
||||
log.info("车辆{}执行故障报警事件",jsonObject.getString("vin"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void eventManage(String vin, List<VehicleData> vehicleDataList) {
|
||||
vehicleDataList.stream().forEach(vehicleData -> {
|
||||
//车辆状态
|
||||
if (vehicleData.getVehicleStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.VEHICLESTATUS);
|
||||
}
|
||||
//充电状态
|
||||
if (vehicleData.getChargingStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.CHARGINGSTATUS);
|
||||
}
|
||||
//运行状态
|
||||
if (vehicleData.getOperatingStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.OPERATINGSTATUS);
|
||||
}
|
||||
//SOC
|
||||
if (vehicleData.getSocStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.SOCSTATUS);
|
||||
}
|
||||
//可充电储能装置工作状态
|
||||
if (vehicleData.getChargingEnergyStorageStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.CHARGINGENERGYSTORAGESTATUS);
|
||||
}
|
||||
//驱动电机状态
|
||||
if (vehicleData.getDriveMotorStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.DRIVEMOTORSTATUS);
|
||||
}
|
||||
//定位是否有效
|
||||
if (vehicleData.getPositionStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.POSITIONSTATUS);
|
||||
}
|
||||
//EAS(汽车防盗系统)状态
|
||||
if (vehicleData.getEasStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.EASSTATUS);
|
||||
}
|
||||
//PTC(电动加热器)状态
|
||||
if (vehicleData.getPtcStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.PTCSTATUS);
|
||||
}
|
||||
//EPS(电动助力系统)状态
|
||||
if (vehicleData.getEpsStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.EPSSTATUS);
|
||||
}
|
||||
//ABS(防抱死)状态
|
||||
if (vehicleData.getAbsStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.ABSSTATUS);
|
||||
}
|
||||
//MCU(电机/逆变器)状态
|
||||
if (vehicleData.getMcuStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.MCUSTATUS);
|
||||
}
|
||||
//动力电池加热状态
|
||||
if (vehicleData.getHeatingStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.HEATINGSTATUS);
|
||||
}
|
||||
//动力电池当前状态
|
||||
if (vehicleData.getBatteryStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.BATTERYSTATUS);
|
||||
}
|
||||
//动力电池保温状态
|
||||
if (vehicleData.getBatteryInsulationStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.BATTERYINSULATIONSTATUS);
|
||||
}
|
||||
//DCDC(电力交换系统)状态
|
||||
if (vehicleData.getDcdcStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.DCDCSTATUS);
|
||||
}
|
||||
//CHG(充电机)状态
|
||||
if (vehicleData.getChgStatus() == 0){
|
||||
hasLocalCache(vehicleData, FaultCodeConstants.CHGSTATUS);
|
||||
}
|
||||
|
||||
});
|
||||
public void eventManage(String vin, List<JSONObject> jsonObjects) {
|
||||
// vehicleDataList.stream().forEach(vehicleData -> {
|
||||
// //车辆状态
|
||||
// if (vehicleData.getVehicleStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.VEHICLESTATUS);
|
||||
// }
|
||||
// //充电状态
|
||||
// if (vehicleData.getChargingStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.CHARGINGSTATUS);
|
||||
// }
|
||||
// //运行状态
|
||||
// if (vehicleData.getOperatingStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.OPERATINGSTATUS);
|
||||
// }
|
||||
// //SOC
|
||||
// if (vehicleData.getSocStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.SOCSTATUS);
|
||||
// }
|
||||
// //可充电储能装置工作状态
|
||||
// if (vehicleData.getChargingEnergyStorageStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.CHARGINGENERGYSTORAGESTATUS);
|
||||
// }
|
||||
// //驱动电机状态
|
||||
// if (vehicleData.getDriveMotorStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.DRIVEMOTORSTATUS);
|
||||
// }
|
||||
// //定位是否有效
|
||||
// if (vehicleData.getPositionStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.POSITIONSTATUS);
|
||||
// }
|
||||
// //EAS(汽车防盗系统)状态
|
||||
// if (vehicleData.getEasStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.EASSTATUS);
|
||||
// }
|
||||
// //PTC(电动加热器)状态
|
||||
// if (vehicleData.getPtcStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.PTCSTATUS);
|
||||
// }
|
||||
// //EPS(电动助力系统)状态
|
||||
// if (vehicleData.getEpsStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.EPSSTATUS);
|
||||
// }
|
||||
// //ABS(防抱死)状态
|
||||
// if (vehicleData.getAbsStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.ABSSTATUS);
|
||||
// }
|
||||
// //MCU(电机/逆变器)状态
|
||||
// if (vehicleData.getMcuStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.MCUSTATUS);
|
||||
// }
|
||||
// //动力电池加热状态
|
||||
// if (vehicleData.getHeatingStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.HEATINGSTATUS);
|
||||
// }
|
||||
// //动力电池当前状态
|
||||
// if (vehicleData.getBatteryStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.BATTERYSTATUS);
|
||||
// }
|
||||
// //动力电池保温状态
|
||||
// if (vehicleData.getBatteryInsulationStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.BATTERYINSULATIONSTATUS);
|
||||
// }
|
||||
// //DCDC(电力交换系统)状态
|
||||
// if (vehicleData.getDcdcStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.DCDCSTATUS);
|
||||
// }
|
||||
// //CHG(充电机)状态
|
||||
// if (vehicleData.getChgStatus() == 0){
|
||||
// hasLocalCache(vehicleData, FaultCodeConstants.CHGSTATUS);
|
||||
// }
|
||||
//
|
||||
// });
|
||||
log.info("车辆{}执行故障报警事件",vin);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.muyu.eventdriven.tactics.system;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
import com.muyu.eventdriven.tactics.EventTactics;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -17,12 +18,12 @@ import java.util.List;
|
|||
@Log4j2
|
||||
public class IndexWarningEvent implements EventTactics {
|
||||
@Override
|
||||
public void eventManage(VehicleData vehicleData) {
|
||||
log.info("车辆{}执行指标预警事件",vehicleData.getVin());
|
||||
public void eventManage(JSONObject jsonObject) {
|
||||
log.info("车辆{}执行指标预警事件",jsonObject.getString("vin"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void eventManage(String vin, List<VehicleData> vehicleDataList) {
|
||||
public void eventManage(String vin, List<JSONObject> jsonObjects) {
|
||||
log.info("车辆{}执行指标预警事件",vin);
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.muyu.eventdriven.tactics.system;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.eventdriven.domain.VehicleData;
|
||||
import com.muyu.eventdriven.tactics.EventTactics;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -23,20 +24,16 @@ public class RealTimeDataEvent implements EventTactics {
|
|||
private RedisTemplate<String,String> redisTemplate;
|
||||
|
||||
@Override
|
||||
public void eventManage(VehicleData vehicleData) {
|
||||
if (redisTemplate.hasKey(vehicleData.getVin())){
|
||||
log.info("{}监听到的消息内容: {}", vehicleData.getVin(),vehicleData);
|
||||
redisTemplate.opsForList().rightPush(vehicleData.getVin(), JSON.toJSONString(vehicleData));
|
||||
}
|
||||
log.info("车辆{}执行实时数据事件",vehicleData.getVin());
|
||||
public void eventManage(JSONObject jsonObject) {
|
||||
log.info("车辆{}执行实时数据事件",jsonObject.getString("vin"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void eventManage(String vin, List<VehicleData> vehicleDataList) {
|
||||
public void eventManage(String vin, List<JSONObject> jsonObjects) {
|
||||
log.info("车辆{}执行实时数据事件",vin);
|
||||
if (redisTemplate.hasKey(vin)){
|
||||
log.info("{}监听到的消息内容: {}", vin,vehicleDataList);
|
||||
vehicleDataList.forEach(vehicleData -> {
|
||||
log.info("{}监听到的消息内容: {}", vin,jsonObjects);
|
||||
jsonObjects.forEach(vehicleData -> {
|
||||
redisTemplate.opsForList().rightPush(vin, JSON.toJSONString(vehicleData));
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue