feat: 新增根据车辆滑窗配置获取数据

master
yaoxin 2024-06-28 22:23:00 +08:00
parent b44c040fad
commit 198aa471d7
27 changed files with 421 additions and 138 deletions

17
pom.xml
View File

@ -33,6 +33,12 @@
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
<!-- 对于@EnableCaching注解开启-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<!-- SpringBoot Boot Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -59,17 +65,6 @@
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Mybatis 依赖配置 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iotdb/iotdb-session -->
<dependency>
<groupId>org.apache.iotdb</groupId>

View File

@ -5,9 +5,11 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableCaching
@EnableAsync
public class EventDrivenApplication {
public static void main(String[] args) {

View File

@ -0,0 +1,46 @@
package com.muyu.eventdriven.cache;
import com.alibaba.fastjson.JSON;
import com.muyu.eventdriven.constants.CaffeineConstants;
import com.muyu.eventdriven.constants.RedisConstants;
import com.muyu.eventdriven.domain.SlidingWindowInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @ClassName CaffeineCacheService
* @Description caffeine
* @Author Xin.Yao
* @Date 2024/6/28 2:19
*/
@Component
public class CaffeineCacheService {
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Cacheable(key = "#vin" , value = CaffeineConstants.CAFFEINE_CACHE_NAME)
public String getRedisAnalyzeConfig(String vin){
// Object o = redisTemplate.opsForHash().get(RedisConstants.ANALYZE_CONFIG,vin);
// return o.toString();
return null;
}
@Cacheable(key = "#vin" , value = CaffeineConstants.CAFFEINE_CACHE_NAME)
public List<SlidingWindowInfo> getRedisSlidingWindow(String vin){
List<String> range = redisTemplate.opsForList().range(vin, 0, -1);
List<SlidingWindowInfo> slidingWindowInfos = range.stream().map(object -> {
return JSON.parseObject(object, SlidingWindowInfo.class);
}).collect(Collectors.toList());
// ArrayList<SlidingWindowInfo> slidingWindowInfos = new ArrayList<>();
// slidingWindowInfos.add(new SlidingWindowInfo("JAV0VJUJYOTOK9KSY", 20, 10,"a",1,2));
// slidingWindowInfos.add(new SlidingWindowInfo("WZHFHDAIYUVKSHH3G", 15, 5,"b",2,3));
// slidingWindowInfos.add(new SlidingWindowInfo("VIN12345678912345", 30, 15,"c",3,5));
return slidingWindowInfos;
}
}

View File

@ -1,15 +0,0 @@
package com.muyu.eventdriven.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
/**
* @ClassName DataAccessClientConfig
* @Description
* @Author Xin.Yao
* @Date 2024/5/9 19:52
*/
//@ComponentScan
//@Import({EventDrivenRunner.class})
//public class EventDrivenConfig {
//}

View File

@ -1,20 +0,0 @@
package com.muyu.eventdriven.config;
/**
* @ClassName DataAccessClientRunner
* @Description
* @Author Xin.Yao
* @Date 2024/5/9 19:53
*/
//@Log4j2
//@Component
//public class EventDrivenRunner implements ApplicationRunner {
// @Autowired
// private EventTacticsManage eventTacticsManage;
//
// @Override
// public void run(ApplicationArguments args){
// eventTacticsManage.initEventTacticsManage();
// }
//}

View File

@ -2,6 +2,9 @@ package com.muyu.eventdriven.config.caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.eventdriven.constants.CaffeineConstants;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -26,4 +29,17 @@ public class CaffeineCacheConfig {
.maximumSize(1000)
.build();
}
@Bean(CaffeineConstants.CAFFEINE_CACHE_NAME)
public CacheManager caffeineCacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(Caffeine.newBuilder()
// 设置最后一次写入或访问后经过固定时间过期
// .expireAfterAccess(60, TimeUnit.SECONDS)
// 初始的缓存空间大小
.initialCapacity(100)
// 缓存的最大条数
.maximumSize(1000));
return cacheManager;
}
}

View File

@ -0,0 +1,33 @@
package com.muyu.eventdriven.config.rabbit;
import com.muyu.eventdriven.constants.RabbitConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitCacheConfig
* @Description rabbit
* @Author Xin.Yao
* @Date 2024/6/28 2:33
*/
@Configuration
public class RabbitCacheConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConstants.ANALYZE_NORM_UPDATE_EXCHANGE);
}
@Bean
public Queue queue() {
return new Queue(RabbitConstants.ANALYZE_NORM_UPDATE_QUEUE, true);
}
@Bean
public Binding bindingQueue() {
return BindingBuilder.bind(queue()).to(fanoutExchange());
}
}

View File

@ -1,10 +1,7 @@
package com.muyu.eventdriven.config.rabbit;
import com.muyu.eventdriven.constants.RabbitConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

View File

@ -0,0 +1,11 @@
package com.muyu.eventdriven.constants;
/**
* @ClassName CaffeineConstants
* @Description caffeine
* @Author Xin.Yao
* @Date 2024/6/28 2:20
*/
public class CaffeineConstants {
public static final String CAFFEINE_CACHE_NAME = "caffeineCacheManager";
}

View File

@ -1,5 +1,9 @@
package com.muyu.eventdriven.constants;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @ClassName FaultCodeConstants
* @Description
@ -7,21 +11,45 @@ package com.muyu.eventdriven.constants;
* @Date 2024/6/21 11:49
*/
public class FaultCodeConstants {
public static final String VEHICLESTATUS = "GZ001";
public static final String CHARGINGSTATUS = "GZ002";
public static final String OPERATINGSTATUS = "GZ003";
public static final String SOCSTATUS = "GZ004";
public static final String CHARGINGENERGYSTORAGESTATUS = "GZ005";
public static final String DRIVEMOTORSTATUS = "GZ006";
public static final String POSITIONSTATUS = "GZ007";
public static final String EASSTATUS = "GZ008";
public static final String PTCSTATUS = "GZ009";
public static final String EPSSTATUS = "GZ010";
public static final String ABSSTATUS = "GZ011";
public static final String MCUSTATUS = "GZ012";
public static final String HEATINGSTATUS = "GZ013";
public static final String BATTERYSTATUS = "GZ014";
public static final String BATTERYINSULATIONSTATUS = "GZ015";
public static final String DCDCSTATUS = "GZ016";
public static final String CHGSTATUS = "GZ017";
public static final ConcurrentMap<String,String> faultCodeMap = FaultCodeConstants.initFaultCodeMap();
private static ConcurrentMap<String, String> initFaultCodeMap() {
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
// 添加默认值
map.put("vehicleStatus", "GZ001");
map.put("chargingStatus", "GZ002");
map.put("operatingStatus", "GZ003");
map.put("socStatus", "GZ004");
map.put("chargingEnergyStorageStatus", "GZ005");
map.put("driveMotorStatus", "GZ006");
map.put("positionStatus", "GZ007");
map.put("easStatus", "GZ008");
map.put("ptcStatus", "GZ009");
map.put("epsStatus", "GZ010");
map.put("absStatus", "GZ011");
map.put("mcuStatus", "GZ012");
map.put("heatingStatus", "GZ013");
map.put("batteryStatus", "GZ014");
map.put("batteryInsulationStatus", "GZ015");
map.put("dcdcStatus", "GZ016");
map.put("chgStatus", "GZ017");
return map;
}
// public static final String VEHICLESTATUS = "GZ001";
// public static final String CHARGINGSTATUS = "GZ002";
// public static final String OPERATINGSTATUS = "GZ003";
// public static final String SOCSTATUS = "GZ004";
// public static final String CHARGINGENERGYSTORAGESTATUS = "GZ005";
// public static final String DRIVEMOTORSTATUS = "GZ006";
// public static final String POSITIONSTATUS = "GZ007";
// public static final String EASSTATUS = "GZ008";
// public static final String PTCSTATUS = "GZ009";
// public static final String EPSSTATUS = "GZ010";
// public static final String ABSSTATUS = "GZ011";
// public static final String MCUSTATUS = "GZ012";
// public static final String HEATINGSTATUS = "GZ013";
// public static final String BATTERYSTATUS = "GZ014";
// public static final String BATTERYINSULATIONSTATUS = "GZ015";
// public static final String DCDCSTATUS = "GZ016";
// public static final String CHGSTATUS = "GZ017";
}

View File

@ -12,4 +12,14 @@ public class RabbitConstants {
public static final String EXCHANGE_STATUS = "exchange_status";
public static final String STATUS_ABNORMAL = "abnormal";
public static final String STATUS_NORMAL = "normal";
public static final String ANALYZE_NORM_UPDATE_EXCHANGE = "analyze_norm_update_exchange";
public static final String ANALYZE_NORM_UPDATE_QUEUE = "analyze_norm_update_queue1";
/**
* 线
*/
public static final String VEHICLE_TOP_LINE_EVENT_QUEUE = "vehicle_top_line_event_queue";
/**
* 线
*/
public static final String VEHICLE_DOWN_LINE_EVENT_QUEUE = "vehicle_down_line_event_queue";
}

View File

@ -11,4 +11,5 @@ import org.springframework.stereotype.Component;
public class RedisConstants {
public static final String VEHICLE_EVENT = "vehicle_event";
public static final String VEHICLE_FAULT_KEY = "vehicle_fault_key:";
public static final String ANALYZE_CONFIG = "analyze_config";
}

View File

@ -1,4 +1,4 @@
package com.muyu.eventdriven.consumer;
package com.muyu.eventdriven.consumer.kafka;
import com.muyu.eventdriven.domain.VehicleKafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;

View File

@ -0,0 +1,45 @@
package com.muyu.eventdriven.consumer.rabbit;
import com.muyu.eventdriven.cache.CaffeineCacheService;
import com.muyu.eventdriven.constants.RabbitConstants;
import com.muyu.eventdriven.tactics.EventTactics;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
/**
* @ClassName RabbitConsumer
* @Description
* @Author Xin.Yao
* @Date 2024/6/6 9:35
*/
@Component
@Log4j2
public class RabbitConsumer {
@Autowired
private CaffeineCacheService caffeineCacheService;
@Autowired
private ApplicationContext applicationContext;
@RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.VEHICLE_TOP_LINE_EVENT_QUEUE)})
public void topLineEventQueue(String msg){
msg=msg.replace("\"","");
log.info("车辆上线监听到的消息:{}",msg);
caffeineCacheService.getRedisAnalyzeConfig(msg);
applicationContext.getBean("IndexWarningEvent", EventTactics.class).eventManage(msg,null);
}
@RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.VEHICLE_DOWN_LINE_EVENT_QUEUE)})
public void downLineEventQueue(String msg){
log.info("车辆下线监听到的消息:{}",msg);
}
@RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.ANALYZE_NORM_UPDATE_QUEUE)})
@CacheEvict(key = "#msg" , value = "caffeineCacheManager")
public void analyzeNormUpdate(String msg){
log.info("监听到的消息:{}",msg);
}
}

View File

@ -15,6 +15,7 @@ import com.alibaba.fastjson.JSONObject;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.Date;
/**
* @ClassName AsVehicleEvent
@ -37,8 +38,8 @@ public class IotDbController {
*/
@PostMapping("/api/device/insert")
public ResponseData insert(@RequestBody VehicleParam vehicleParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
String str1 ="{\"vin\":\"WZHFHDAIYUVKSHH3G\",\"drivingRoute\":\"1719454864664\",\"longitude\":\"116.678005\",\"latitude\":\"39.547881\",\"mileage\":\"9287722.9\"}";
String str2 ="{\"vin\":\"WZHFHDAIYUVKSHH3G\",\"drivingRoute\":\"1719454862674\",\"longitude\":\"116.675176\",\"latitude\":\"39.547643\",\"mileage\":\"81565971.4\"}";
String str1 ="{\"vin\":\"WZHFHDAIYUVKSHH3G\",\"drivingRoute\":\""+String.valueOf(new Date().getTime())+"\",\"longitude\":\"116.678005\",\"latitude\":\"39.547881\",\"mileage\":\"9287722.9\"}";
String str2 ="{\"vin\":\"WZHFHDAIYUVKSHH3G\",\"drivingRoute\":\""+String.valueOf(new Date().getTime())+"\",\"longitude\":\"116.675176\",\"latitude\":\"39.547643\",\"mileage\":\"81565971.4\"}";
ArrayList<JSONObject> jsonObjects = new ArrayList<>();
jsonObjects.add(JSONObject.parseObject(str1));
jsonObjects.add(JSONObject.parseObject(str2));

View File

@ -0,0 +1,44 @@
package com.muyu.eventdriven.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* @ClassName AnalyzeConfigInfo
* @Description
* @Author Xin.Yao
* @Date 2024/6/26 2:08
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class AnalyzeConfigInfo {
/**
*
*/
private Integer startBit;
/**
*
*/
private Integer stopBit;
/**
* key
*/
private String keyCode;
/**
*
*/
private String label;
/**
*
*/
private String type;
/**
*
*/
private Integer attributeType;
}

View File

@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.iotdb.tsfile.read.filter.operator.In;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
@ -27,13 +28,21 @@ public class SlidingWindowInfo {
/**
*
*/
private String slidingWindowSize;
private Integer range;
/**
*
*/
private String slideSize;
private Integer opportunity;
/**
* key
* key
*/
private List<String> dataKeys;
private String keyCode;
/**
* :1 2 3
*/
private Integer attributeType;
/**
*
*/
private Integer attributeValue;
}

View File

@ -1,11 +1,28 @@
package com.muyu.eventdriven.listener;
import com.alibaba.fastjson.JSON;
import com.muyu.eventdriven.domain.SlidingWindowInfo;
import com.muyu.eventdriven.listener.event.SlidingWindowEvent;
import com.muyu.eventdriven.model.param.VehicleParam;
import com.muyu.eventdriven.model.result.VehicleResult;
import com.muyu.eventdriven.server.IotDbServer;
import com.muyu.eventdriven.tactics.system.IndexWarningEvent;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* @ClassName SlidingWindowListener
* @Description
@ -15,9 +32,30 @@ import org.springframework.stereotype.Component;
@Component
@Log4j2
public class SlidingWindowListener {
@Autowired
private IndexWarningEvent indexWarningEvent;
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private IotDbServer iotDbServer;
@Async
@EventListener(SlidingWindowEvent.class)
public void slidingWindowListener(SlidingWindowEvent slidingWindowEvent) {
log.info("执行了监听事件,数据为:{}", slidingWindowEvent.getSource());
SlidingWindowInfo slidingWindowInfo = (SlidingWindowInfo) slidingWindowEvent.getSource();
// 获取当前时间并向前推指定秒数例如300秒
LocalDateTime shiftedDateTime = LocalDateTime.now().minusSeconds(slidingWindowInfo.getRange());
// 将LocalDateTime转换为Instant需要指定时区
Instant instant = shiftedDateTime.atZone(ZoneId.systemDefault()).toInstant();
try {
VehicleParam vehicleParam = new VehicleParam(slidingWindowInfo.getVin(), slidingWindowInfo.getKeyCode(), "", "");
List<VehicleResult> vehicleResults = iotDbServer.queryDataFromIotDb(vehicleParam);
log.info("车辆:{},查询:{}的数据:{}",slidingWindowInfo.getVin(),slidingWindowInfo.getKeyCode(),vehicleResults);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (redisTemplate.opsForHash().hasKey("vehicleKafka", slidingWindowInfo.getVin())){
indexWarningEvent.loopPutOffEvent(slidingWindowInfo);
}
}
}

View File

@ -1,14 +0,0 @@
package com.muyu.eventdriven.mapper;
import org.apache.ibatis.annotations.Mapper;
/**
* @ClassName EventDrivenMapper
* @Description DAO
* @Author Xin.Yao
* @Date 2024/6/16 3:47
*/
@Mapper
public interface EventDrivenMapper {
}

View File

@ -1,6 +1,9 @@
package com.muyu.eventdriven.model.param;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @ClassName AsVehicleEvent
* @Description iotdb
@ -8,11 +11,17 @@ import lombok.Data;
* @Date 2024/6/16 3:29
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VehicleParam {
/***
* vin
*/
private String vin;
/***
*
*/
private String queryAttribute;
/***
*
*/

View File

@ -2,6 +2,7 @@ package com.muyu.eventdriven.server;
import com.alibaba.fastjson.JSONObject;
import com.muyu.eventdriven.model.param.VehicleParam;
import com.muyu.eventdriven.model.result.VehicleResult;
import java.util.List;
@ -22,5 +23,5 @@ public interface IotDbServer {
/**
*
*/
Object queryDataFromIotDb(VehicleParam vehicleParam) throws Exception;
List<VehicleResult> queryDataFromIotDb(VehicleParam vehicleParam) throws Exception;
}

View File

@ -2,10 +2,9 @@ package com.muyu.eventdriven.server.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.muyu.eventdriven.consumer.KafkaConsumers;
import com.muyu.eventdriven.consumer.kafka.KafkaConsumers;
import com.muyu.eventdriven.domain.EventTacticsManage;
import com.muyu.eventdriven.domain.SlidingWindowInfo;
import com.muyu.eventdriven.domain.VehicleData;
import com.muyu.eventdriven.domain.VehicleKafka;
import com.muyu.eventdriven.domain.rest.Result;
import com.muyu.eventdriven.listener.event.SlidingWindowEvent;
@ -23,9 +22,7 @@ import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* @ClassName EventInfoServiceImpl
@ -76,7 +73,9 @@ public class EventInfoServiceImpl implements EventInfoService {
String vehicleEventString = "1,2,3,4,5";
for (String str : vehicleEventString.split(",")) {
CompletableFuture.runAsync(() -> {
applicationContext.getBean(classNameList.get(Integer.parseInt(str)), EventTactics.class).eventManage(key,value);
if (!str.equals("5")){
applicationContext.getBean(classNameList.get(Integer.parseInt(str)), EventTactics.class).eventManage(key,value);
}
});
}
});
@ -135,13 +134,20 @@ public class EventInfoServiceImpl implements EventInfoService {
@Override
public void slidingWindow() {
List<String> strings = new ArrayList<>();
strings.add("a");
strings.add("b");
strings.add("c");
strings.add("d");
SlidingWindowEvent aaaa = new SlidingWindowEvent(new SlidingWindowInfo("AAAA", "20", "10", strings));
ArrayList<SlidingWindowInfo> slidingWindowInfos = new ArrayList<>();
slidingWindowInfos.add(new SlidingWindowInfo("vin1", 20, 10,"a",1,2));
slidingWindowInfos.add(new SlidingWindowInfo("vin2", 15, 5,"b",2,3));
slidingWindowInfos.add(new SlidingWindowInfo("vin3", 30, 15,"c",3,5));
// 创建一个单线程的ScheduledExecutorService
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
slidingWindowInfos.forEach(slidingWindowInfo -> {
// 延迟5秒后执行的任务
Runnable task = () -> applicationEventPublisher.publishEvent(new SlidingWindowEvent(slidingWindowInfo));
// 使用schedule方法安排任务在5秒后执行
executor.schedule(task, 5, TimeUnit.SECONDS);
});
// 关闭executor不再接受新的任务已提交的任务会继续执行
executor.shutdown();
log.info("执行了指标预警事件");
applicationEventPublisher.publishEvent(aaaa);
}
}

View File

@ -1,7 +1,10 @@
package com.muyu.eventdriven.server.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.muyu.eventdriven.cache.CaffeineCacheService;
import com.muyu.eventdriven.config.iotdb.IotDBSessionConfig;
import com.muyu.eventdriven.domain.AnalyzeConfigInfo;
import com.muyu.eventdriven.model.param.VehicleParam;
import com.muyu.eventdriven.model.result.VehicleResult;
import com.muyu.eventdriven.server.IotDbServer;
@ -11,10 +14,12 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
* @ClassName AsVehicleEvent
@ -29,6 +34,8 @@ public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Autowired
private CaffeineCacheService caffeineCacheService;
@Override
public void insertData(JSONObject jsonObject){
@ -50,16 +57,19 @@ public class IotDbServerImpl implements IotDbServer {
@Override
public void insertDatas(String vin,List<JSONObject> jsonObjects) {
try {
// String analyzeConfig = caffeineCacheService.getRedisAnalyzeConfig(vin);
List<AnalyzeConfigInfo> analyzeConfigInfos = JSON.parseArray("[{\"startBit\":1,\"stopBit\":17,\"keyCode\":\"vin\",\"label\":\"VIN\",\"type\":\"String\"},{\"startBit\":18,\"stopBit\":30,\"keyCode\":\"drivingRoute\",\"label\":\"路径\",\"type\":\"String\"},{\"startBit\":31,\"stopBit\":41,\"keyCode\":\"longitude\",\"label\":\"经度\",\"type\":\"String\"},{\"startBit\":42,\"stopBit\":51,\"keyCode\":\"latitude\",\"label\":\"维度\",\"type\":\"String\"},{\"startBit\":58,\"stopBit\":68,\"keyCode\":\"mileage\",\"label\":\"里程\",\"type\":\"String\"}]", AnalyzeConfigInfo.class);
List<String> collect = analyzeConfigInfos.stream().map(AnalyzeConfigInfo::getKeyCode).collect(Collectors.toList());
List<String> deviceIds =new ArrayList<>();
List<Long> timeList =new ArrayList<>();
List<List<String>> valusList =new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
jsonObjects.stream().forEach(jsonObject -> {
jsonObjects.forEach(jsonObject -> {
deviceIds.add("root.vehicle."+ jsonObject.getString("vin"));
timeList.add(Long.valueOf(jsonObject.getString("drivingRoute")));
valusList.add(Arrays.asList(jsonObject.toString()));
measurementsList.add(Arrays.asList("data"));
List<String> values = collect.stream().map(jsonObject::getString).collect(Collectors.toList());
valusList.add(values);
measurementsList.add(collect);
});
iotDBSessionConfig.insertRecords(deviceIds,timeList,measurementsList,valusList);
} catch (Exception e) {
@ -126,11 +136,13 @@ public class IotDbServerImpl implements IotDbServer {
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
if (null != field.getDataType()){
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
}
vehicleResult.setTime(timeString);
vehicleResult.setVin(vehicleParam.getVin());
vehicleResult.setData(map.get("data"));
vehicleResult.setData(map.get(vehicleParam.getQueryAttribute()));
vehicleResultList.add(vehicleResult);
}
}

View File

@ -3,14 +3,13 @@ package com.muyu.eventdriven.tactics.system;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.github.benmanes.caffeine.cache.Cache;
import com.muyu.eventdriven.cache.CaffeineCacheService;
import com.muyu.eventdriven.constants.FaultCodeConstants;
import com.muyu.eventdriven.constants.RabbitConstants;
import com.muyu.eventdriven.constants.RedisConstants;
import com.muyu.eventdriven.domain.AnalyzeConfigInfo;
import com.muyu.eventdriven.domain.RabbitFalut;
import com.muyu.eventdriven.domain.VehicleData;
import com.muyu.eventdriven.domain.VehicleFaultStatus;
import com.muyu.eventdriven.tactics.EventTactics;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@ -21,6 +20,7 @@ import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @ClassName FaultAlarmEvent
@ -40,6 +40,8 @@ public class FaultAlarmEvent implements EventTactics {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private CaffeineCacheService caffeineCacheService;
@Override
@ -49,6 +51,17 @@ public class FaultAlarmEvent implements EventTactics {
@Override
public void eventManage(String vin, List<JSONObject> jsonObjects) {
String analyzeConfig = caffeineCacheService.getRedisAnalyzeConfig(vin);
List<AnalyzeConfigInfo> analyzeConfigInfos = JSON.parseArray(analyzeConfig, AnalyzeConfigInfo.class);
analyzeConfigInfos = analyzeConfigInfos.stream().filter(analyzeConfigInfo -> analyzeConfigInfo.getAttributeType() == 3).collect(Collectors.toList());
List<AnalyzeConfigInfo> finalAnalyzeConfigInfos = analyzeConfigInfos;
jsonObjects.stream().forEach(jsonObject -> {
finalAnalyzeConfigInfos.forEach(analyzeConfigInfo -> {
if (jsonObject.getString(analyzeConfigInfo.getKeyCode()).equals("0")){
hasLocalCache(jsonObject, FaultCodeConstants.faultCodeMap.get(analyzeConfigInfo.getKeyCode()));
}
});
});
// vehicleDataList.stream().forEach(vehicleData -> {
// //车辆状态
// if (vehicleData.getVehicleStatus() == 0){
@ -123,12 +136,12 @@ public class FaultAlarmEvent implements EventTactics {
log.info("车辆{}执行故障报警事件",vin);
}
public void hasLocalCache(VehicleData vehicleData,String faultCode){
Object o = caffeineCache.get(RedisConstants.VEHICLE_FAULT_KEY+vehicleData.getVin()+":" + faultCode, key -> vehicleData.getDrivingRoute());
if (o.toString().equals(vehicleData.getDrivingRoute())){
public void hasLocalCache(JSONObject jsonObject,String faultCode){
Object o = caffeineCache.get(RedisConstants.VEHICLE_FAULT_KEY+jsonObject.getString("vin")+":" + faultCode, key -> jsonObject.getString("drivingRoute"));
if (o.toString().equals(jsonObject.getString("drivingRoute"))){
RabbitFalut rabbitFalut = new RabbitFalut();
rabbitFalut.setStartTime(new Date());
rabbitFalut.setVin(vehicleData.getVin());
rabbitFalut.setVin(jsonObject.getString("vin"));
rabbitFalut.setFaultCode(faultCode);
rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE_STATUS,RabbitConstants.STATUS_ABNORMAL, JSON.toJSONString(rabbitFalut),message ->{
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
@ -138,7 +151,7 @@ public class FaultAlarmEvent implements EventTactics {
return message;
});
}
redisTemplate.opsForValue().set(RedisConstants.VEHICLE_FAULT_KEY+vehicleData.getVin()+":" + faultCode,vehicleData.getDrivingRoute(),10, TimeUnit.SECONDS);
redisTemplate.opsForValue().set(RedisConstants.VEHICLE_FAULT_KEY+jsonObject.getString("vin")+":" + faultCode,jsonObject.getString("drivingRoute"),10, TimeUnit.SECONDS);
}
}

View File

@ -1,12 +1,21 @@
package com.muyu.eventdriven.tactics.system;
import com.alibaba.fastjson.JSONObject;
import com.muyu.eventdriven.cache.CaffeineCacheService;
import com.muyu.eventdriven.domain.SlidingWindowInfo;
import com.muyu.eventdriven.domain.VehicleData;
import com.muyu.eventdriven.listener.event.SlidingWindowEvent;
import com.muyu.eventdriven.tactics.EventTactics;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @ClassName IndexWarningEvent
@ -17,6 +26,10 @@ import java.util.List;
@Service("IndexWarningEvent")
@Log4j2
public class IndexWarningEvent implements EventTactics {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private CaffeineCacheService caffeineCacheService;
@Override
public void eventManage(JSONObject jsonObject) {
log.info("车辆{}执行指标预警事件",jsonObject.getString("vin"));
@ -24,7 +37,29 @@ public class IndexWarningEvent implements EventTactics {
@Override
public void eventManage(String vin, List<JSONObject> jsonObjects) {
log.info("车辆{}执行指标预警事件",vin);
List<SlidingWindowInfo> slidingWindowInfos = caffeineCacheService.getRedisSlidingWindow("swConfig:"+vin);
// 创建一个单线程的ScheduledExecutorService
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
slidingWindowInfos.forEach(slidingWindowInfo -> {
// 延迟5秒后执行的任务
Runnable task = () -> applicationEventPublisher.publishEvent(new SlidingWindowEvent(slidingWindowInfo));
// 使用schedule方法安排任务在5秒后执行
executor.schedule(task, slidingWindowInfo.getOpportunity(), TimeUnit.SECONDS);
});
// 关闭executor不再接受新的任务已提交的任务会继续执行
executor.shutdown();
}
public void loopPutOffEvent(SlidingWindowInfo slidingWindowInfo){
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
for (SlidingWindowInfo windowInfo : caffeineCacheService.getRedisSlidingWindow("swConfig:" + slidingWindowInfo.getVin())) {
if (windowInfo.getVin().equals(slidingWindowInfo.getVin()) && windowInfo.getKeyCode().equals(slidingWindowInfo.getKeyCode())){
// 延迟执行的任务
Runnable task = () -> applicationEventPublisher.publishEvent(new SlidingWindowEvent(windowInfo));
// 使用schedule方法安排任务在5秒后执行
scheduledExecutorService.schedule(task, windowInfo.getOpportunity(), TimeUnit.SECONDS);
}
}
}
}

View File

@ -23,21 +23,6 @@ spring:
host: 47.99.219.99
port: 6379
password: yx@123
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/event-driven?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: root
# mybatis配置
mybatis:
# 搜索指定包别名
typeAliasesPackage: com.muyu.eventdriven.domain
# 配置mapper的扫描找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/**/*.xml
configuration:
map-underscore-to-camel-case: true
# 将mapper接口所在包的日志级别改成debug可以在控制台打印sql
logging:
level:
com.bwie.**: debug

View File

@ -1,5 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.muyu.eventdriven.mapper.EventDrivenMapper">
</mapper>