feat()指标预警实现
parent
9520c94d15
commit
d17f6842d7
|
@ -37,10 +37,7 @@ public class HandleConnection {
|
|||
String s = new String(message.getBody());
|
||||
JSONObject jsonObject = JSONObject.parseObject(s);
|
||||
String carVin = jsonObject.getString("clientId");
|
||||
//
|
||||
// String s1 = redisTemplate.opsForValue().get(carVin + "_Index_Warn");
|
||||
//
|
||||
// indexWarn.put(carVin+"_Index_Warn", s1);
|
||||
|
||||
|
||||
log.error("链接事件得到的VIN:{}",carVin);
|
||||
}
|
||||
|
|
|
@ -28,10 +28,19 @@ public class RabbitmqConfig {
|
|||
* 修改指标预警配置队列
|
||||
*/
|
||||
public final static String UPDATE_INDEX_WARN = "update_index_warn";
|
||||
/**
|
||||
* 修改报文配置
|
||||
*/
|
||||
public final static String UPDATE_ANALYZE_RULE = "update_analyze_rule";
|
||||
|
||||
|
||||
public static final String CREATE_MQTT_CLIENT = "create_mqtt_client";
|
||||
|
||||
@Bean(UPDATE_ANALYZE_RULE)
|
||||
public Queue UPDATE_ANALYZE_RULE(){
|
||||
return new Queue(UPDATE_ANALYZE_RULE);
|
||||
}
|
||||
|
||||
@Bean(UPDATE_INDEX_WARN)
|
||||
public Queue UPDATE_INDEX_WARN(){
|
||||
return new Queue(UPDATE_INDEX_WARN);
|
||||
|
|
|
@ -62,10 +62,14 @@ public class SyncCacheRunner implements ApplicationRunner {
|
|||
|
||||
analyzeRuleCache.put(cacheCarEvent.getVin()+"_Analyze_Rule", JSON.toJSONString(analyzeRules));
|
||||
|
||||
IndexWarnToRedis cacheIndexWarn = new IndexWarnToRedis(cacheCarEvent.getVin(), "speed,voltage", 20, 10);
|
||||
|
||||
redisTemplate.opsForValue().set(cacheCarEvent.getVin()+"_Index_Warn", JSON.toJSONString(cacheIndexWarn));
|
||||
System.out.println(cacheIndexWarn.toString());
|
||||
//模拟指标配置
|
||||
ArrayList<IndexWarnToRedis> indexWarnToRedis = new ArrayList<>();
|
||||
indexWarnToRedis.add(new IndexWarnToRedis(cacheCarEvent.getVin(), "speed", 20, 10));
|
||||
indexWarnToRedis.add(new IndexWarnToRedis(cacheCarEvent.getVin(), "voltage", 20, 10));
|
||||
|
||||
redisTemplate.opsForValue().set(cacheCarEvent.getVin()+"_Index_Warn", JSON.toJSONString(indexWarnToRedis));
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -128,7 +128,7 @@ public class CreateMqttClientConsumer {
|
|||
try {
|
||||
String jsonData = objectMapper.writeValueAsString(linkedHashMap);
|
||||
log.error("json格式:{}", jsonData);
|
||||
String finalVin = vin;
|
||||
String finalVin = vin;//vin - iFeng
|
||||
transactionTemplate.execute(status -> {
|
||||
try {
|
||||
kafkaTemplate.send("topichyc", finalVin, jsonData);
|
||||
|
|
|
@ -2,6 +2,9 @@ package com.hyc.consumer;
|
|||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hyc.config.RabbitmqConfig;
|
||||
import com.hyc.domain.cache.CacheIndexWarn;
|
||||
import com.hyc.domain.cache.IndexWarnToRedis;
|
||||
import com.hyc.domain.req.HandleIndexWarnConfig;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
|
@ -10,8 +13,12 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 更新本地缓存配置消费者
|
||||
*
|
||||
|
@ -25,6 +32,8 @@ import org.springframework.stereotype.Component;
|
|||
public class UpdateConfig {
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
@Autowired
|
||||
private ApplicationEventPublisher applicationEventPublisher;
|
||||
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
|
@ -32,9 +41,68 @@ public class UpdateConfig {
|
|||
public void indexWarnEventMessage(String indexWarnToRedis, Channel channel, Message message){
|
||||
// List<IndexWarnEventResp> indexWarnEventResps = JSONObject.parseArray(indexWarnMessage, IndexWarnEventResp.class);
|
||||
String vin = JSONObject.parseObject(indexWarnToRedis).getString("vin");
|
||||
HandleIndexWarnConfig handleIndexWarnConfig = JSONObject.parseObject(indexWarnToRedis, HandleIndexWarnConfig.class);
|
||||
Cache indexWarn = cacheManager.getCache("indexWarn");
|
||||
indexWarn.put(vin + "_Index_Warn",indexWarnToRedis);
|
||||
Cache.ValueWrapper valueWrapper = indexWarn.get(vin + "_Index_Warn");
|
||||
log.warn("更新后的值是:{}",valueWrapper.get().toString());
|
||||
|
||||
if ("insert".equals(handleIndexWarnConfig.getHandleType())){
|
||||
if (valueWrapper != null){
|
||||
|
||||
String s = valueWrapper.get().toString();
|
||||
List<IndexWarnToRedis> indexWarnToRedisList = JSONObject.parseArray(s, IndexWarnToRedis.class);
|
||||
|
||||
indexWarnToRedisList.add(new IndexWarnToRedis(handleIndexWarnConfig.getVin(), handleIndexWarnConfig.getIndexNames(), handleIndexWarnConfig.getTotalLength(),handleIndexWarnConfig.getSlideLength()));
|
||||
|
||||
indexWarn.put(handleIndexWarnConfig.getVin() + "_Index_Warn",JSONObject.toJSONString(indexWarnToRedisList));
|
||||
|
||||
applicationEventPublisher.publishEvent(new CacheIndexWarn(this, handleIndexWarnConfig.getVin(), handleIndexWarnConfig.getIndexNames(), handleIndexWarnConfig.getTotalLength(), handleIndexWarnConfig.getSlideLength()));
|
||||
}else {
|
||||
|
||||
}
|
||||
} else if ("delete".equals(handleIndexWarnConfig.getHandleType())) {
|
||||
if (valueWrapper != null){
|
||||
String s = valueWrapper.get().toString();
|
||||
List<IndexWarnToRedis> indexWarnToRedisList = JSONObject.parseArray(s, IndexWarnToRedis.class);
|
||||
List<IndexWarnToRedis> collect = indexWarnToRedisList.stream().filter(indexWarnToRedis1 -> {
|
||||
return indexWarnToRedis1.getIndexName() .equals(handleIndexWarnConfig.getIndexNames()) ? false : true;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
indexWarn.put(handleIndexWarnConfig.getVin() + "_Index_Warn",JSONObject.toJSONString(collect));
|
||||
|
||||
|
||||
}
|
||||
} else if ("update".equals(handleIndexWarnConfig.getHandleType())) {
|
||||
if (valueWrapper != null){
|
||||
String s = valueWrapper.get().toString();
|
||||
List<IndexWarnToRedis> indexWarnToRedisList = JSONObject.parseArray(s, IndexWarnToRedis.class);
|
||||
List<IndexWarnToRedis> collect = indexWarnToRedisList.stream().map(indexWarnToRedis1 -> {
|
||||
if (indexWarnToRedis1.getIndexName().equals(handleIndexWarnConfig.getIndexNames())) {
|
||||
return new IndexWarnToRedis(handleIndexWarnConfig.getVin(), handleIndexWarnConfig.getIndexNames(), handleIndexWarnConfig.getTotalLength(), handleIndexWarnConfig.getSlideLength());
|
||||
}else {
|
||||
return indexWarnToRedis1;
|
||||
}
|
||||
|
||||
}).collect(Collectors.toList());
|
||||
indexWarn.put(handleIndexWarnConfig.getVin() + "_Index_Warn",JSONObject.toJSONString(collect));
|
||||
}
|
||||
}
|
||||
|
||||
if (valueWrapper != null){
|
||||
log.warn("更新后的值是:{}",valueWrapper.get().toString());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@RabbitListener(queues = RabbitmqConfig.UPDATE_ANALYZE_RULE)
|
||||
public void analyzeUpdateMessage(String analyzeUpdateMessage, Channel channel, Message message){
|
||||
JSONObject jsonObject = JSONObject.parseObject(analyzeUpdateMessage);
|
||||
String vin = jsonObject.getString("vin");
|
||||
log.warn("报文解析获取到的vin是:{}",vin);
|
||||
String analyzeRules = jsonObject.getString("analyzeRules");
|
||||
log.warn("传递的报文信息是:{}",analyzeRules);
|
||||
Cache analyzeRule = cacheManager.getCache("analyzeRule");
|
||||
analyzeRule.put(vin + "_Analyze_Rule",analyzeRules);
|
||||
log.info("vin为:{}的车辆报文解析配置更新成功!",vin);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,9 +26,9 @@ public class IndexWarnToRedis{
|
|||
*/
|
||||
private String vin;
|
||||
/**
|
||||
* 指标项名字,多项以逗号隔开
|
||||
* 指标项名字
|
||||
*/
|
||||
private String indexNames;
|
||||
private String indexName;
|
||||
/**
|
||||
* 滑窗长度 单位秒
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
package com.hyc.domain.req;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
/**
|
||||
* 缓存指标预警信息参数类
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: CacheIndexWarn
|
||||
* @Description: 缓存指标预警信息参数类
|
||||
* @CreateTime: 2024/6/28 14:32
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class HandleIndexWarnConfig {
|
||||
/**
|
||||
* 关联小车
|
||||
*/
|
||||
private String vin;
|
||||
/**
|
||||
* 指标项名字,多项以逗号隔开
|
||||
*/
|
||||
private String indexNames;
|
||||
/**
|
||||
* 滑窗长度 单位秒
|
||||
*/
|
||||
private Integer totalLength;
|
||||
/**
|
||||
* 滑动长度 单位秒
|
||||
*/
|
||||
private Integer slideLength;
|
||||
|
||||
/**
|
||||
* 操作类型 insert delete;
|
||||
*/
|
||||
private String handleType;
|
||||
|
||||
|
||||
}
|
|
@ -158,24 +158,25 @@ public enum EventStrategy {
|
|||
INDEX_WARNING {
|
||||
@Override
|
||||
public void exe(CacheManager cacheManager, RedisTemplate<String, String> redisTemplate, JSONObject jsonObject, RabbitTemplate rabbitTemplate, ApplicationEventPublisher applicationEventPublisher, ScheduledExecutorService executorService, IotDbServer iotDbServer) {
|
||||
|
||||
String vin = jsonObject.getString("vin");
|
||||
|
||||
Cache indexWarn = cacheManager.getCache("indexWarn");
|
||||
Cache.ValueWrapper valueWrapper = indexWarn.get(vin + "_Index_Warn");
|
||||
if (valueWrapper == null){
|
||||
if (valueWrapper == null) {
|
||||
String indexWarnJsonString = redisTemplate.opsForValue().get(vin + "_Index_Warn");
|
||||
|
||||
indexWarn.put(vin+"_Index_Warn", indexWarnJsonString);
|
||||
Cache indexWarnTwo = cacheManager.getCache("indexWarn");
|
||||
indexWarn.put(vin + "_Index_Warn", indexWarnJsonString);
|
||||
Cache.ValueWrapper valueWrapperTwo = indexWarn.get(vin + "_Index_Warn");
|
||||
String cacheIndexWarnJsonString = valueWrapperTwo.get().toString();
|
||||
IndexWarnToRedis cacheIndexWarn = JSONObject.parseObject(cacheIndexWarnJsonString, IndexWarnToRedis.class);
|
||||
List<IndexWarnToRedis> indexWarnToRedis = JSONObject.parseArray(cacheIndexWarnJsonString, IndexWarnToRedis.class);
|
||||
for (IndexWarnToRedis indexWarnToR : indexWarnToRedis) {
|
||||
applicationEventPublisher.publishEvent(new CacheIndexWarn(this, indexWarnToR.getVin(), indexWarnToR.getIndexName(), indexWarnToR.getTotalLength(), indexWarnToR.getSlideLength()));
|
||||
}
|
||||
log.info("指标预警Boot事件首次发布!");
|
||||
applicationEventPublisher.publishEvent(new CacheIndexWarn(this, cacheIndexWarn.getVin(), cacheIndexWarn.getIndexNames(), cacheIndexWarn.getTotalLength(), cacheIndexWarn.getSlideLength()));
|
||||
log.warn("进来了吗?");
|
||||
|
||||
}else {
|
||||
log.info("车辆:{}对应指标预警事件正在执行!",vin);
|
||||
} else {
|
||||
log.info("车辆:{}对应指标预警事件正在执行!", vin);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -50,57 +50,70 @@ public class IndexWarnListener {
|
|||
|
||||
|
||||
@EventListener
|
||||
public void handleIndexWarnEvent(CacheIndexWarn cacheIndexWarn){
|
||||
public void handleIndexWarnEvent(CacheIndexWarn cacheIndexWarn) {
|
||||
|
||||
executorService.schedule(() -> {
|
||||
long time = new Date().getTime();
|
||||
|
||||
try {
|
||||
HashMap<String, Double> stringDoubleHashMap = new HashMap<>();
|
||||
|
||||
//根据VIN从iotDb获取数据
|
||||
List<String> carDataMapList = (List<String>) iotDbServer.queryDataFromIotDb(new IotDbParam(cacheIndexWarn.getVin(), String.valueOf(time - cacheIndexWarn.getSlideLength() * 1000), String.valueOf(time)));
|
||||
|
||||
|
||||
//获取该VIN对应的指标预警配置
|
||||
String[] split = cacheIndexWarn.getIndexNames().split(",");
|
||||
String indexNames = cacheIndexWarn.getIndexNames();
|
||||
Double sum = 0.0;
|
||||
for (String s : carDataMapList) {
|
||||
for (String s1 : split) {
|
||||
log.warn("s:{},s1:{}",s,s1);
|
||||
String string = JSONObject.parseObject(s).getString(s1);
|
||||
stringDoubleHashMap.put(s1,stringDoubleHashMap.get(s1)==null?Double.valueOf(string):Double.valueOf(string)+stringDoubleHashMap.get(s1));
|
||||
}
|
||||
}
|
||||
for (String s : split) {
|
||||
stringDoubleHashMap.put(s,stringDoubleHashMap.get(s) / split.length);
|
||||
}
|
||||
|
||||
ArrayList<IndexWarnEventResp> indexWarnEventRespList = new ArrayList<>();
|
||||
for (String s : stringDoubleHashMap.keySet()) {
|
||||
String qs = "";
|
||||
if (stringDoubleHashMap.get(s)<600){
|
||||
qs = "下降";
|
||||
} else if (stringDoubleHashMap.get(s) > 8000) {
|
||||
qs = "上升";
|
||||
}else {
|
||||
qs = "波动";
|
||||
}
|
||||
indexWarnEventRespList.add(new IndexWarnEventResp(cacheIndexWarn.getVin(),s,stringDoubleHashMap.get(s),qs));
|
||||
String string = JSONObject.parseObject(s).getString(indexNames);
|
||||
sum += Double.valueOf(string);
|
||||
|
||||
}
|
||||
Double avg = sum / carDataMapList.size();
|
||||
|
||||
|
||||
rabbitTemplate.convertAndSend(RabbitmqConfig.INDEX_WARN_QUEUE, JSONObject.toJSONString(indexWarnEventRespList));
|
||||
log.error("计算出来的值是:{}",stringDoubleHashMap);
|
||||
String qs = "";
|
||||
if (avg < 600) {
|
||||
qs = "下降";
|
||||
} else if (avg > 8000) {
|
||||
qs = "上升";
|
||||
} else {
|
||||
qs = "波动";
|
||||
}
|
||||
IndexWarnEventResp indexWarnEventResp = new IndexWarnEventResp(cacheIndexWarn.getVin(), qs, avg, cacheIndexWarn.getIndexNames());
|
||||
|
||||
|
||||
if (carDataMapList.size() > 0) {
|
||||
rabbitTemplate.convertAndSend(RabbitmqConfig.INDEX_WARN_QUEUE, JSONObject.toJSONString(indexWarnEventResp));
|
||||
}
|
||||
|
||||
log.error("计算出来的值是:{}", indexWarnEventResp);
|
||||
if (Double.isNaN(indexWarnEventResp.getAverage())) {
|
||||
log.error("一级警报");
|
||||
System.out.println("Average is NaN.");
|
||||
}
|
||||
|
||||
if (!(indexWarnEventResp.getAverage() instanceof Number || indexWarnEventResp.getAverage() == null)) {
|
||||
log.error("一级警报:{}", indexWarnEventResp);
|
||||
System.out.println("他不是一个数字");
|
||||
}
|
||||
|
||||
Cache indexWarn = cacheManager.getCache("indexWarn");
|
||||
Cache.ValueWrapper valueWrapper = indexWarn.get(cacheIndexWarn.getVin() + "_Index_Warn");
|
||||
if (valueWrapper != null){
|
||||
if (valueWrapper != null) {
|
||||
String cacheIndexWarnJsonStringTwo = valueWrapper.get().toString();
|
||||
IndexWarnToRedis cacheIndexWarnTwo = JSONObject.parseObject(cacheIndexWarnJsonStringTwo, IndexWarnToRedis.class);
|
||||
List<IndexWarnToRedis> indexWarnToRedis = JSONObject.parseArray(cacheIndexWarnJsonStringTwo, IndexWarnToRedis.class);
|
||||
|
||||
applicationEventPublisher.publishEvent(new CacheIndexWarn(this, cacheIndexWarnTwo.getVin(), cacheIndexWarnTwo.getIndexNames(), cacheIndexWarnTwo.getTotalLength(), cacheIndexWarnTwo.getSlideLength()));
|
||||
for (IndexWarnToRedis indexWarnToRedi : indexWarnToRedis) {
|
||||
if (indexWarnToRedi.getIndexName().equals(cacheIndexWarn.getIndexNames())) {
|
||||
applicationEventPublisher.publishEvent(new CacheIndexWarn(this, indexWarnToRedi.getVin(), indexWarnToRedi.getIndexName(), indexWarnToRedi.getTotalLength(), indexWarnToRedi.getSlideLength()));
|
||||
}
|
||||
}
|
||||
|
||||
}else {
|
||||
log.info("车辆:{}已下线,指标预警停止",cacheIndexWarn.getVin());
|
||||
|
||||
} else {
|
||||
log.info("车辆:{}已下线,指标预警停止", cacheIndexWarn.getVin());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
|
@ -109,12 +122,8 @@ public class IndexWarnListener {
|
|||
|
||||
}, cacheIndexWarn.getSlideLength(), TimeUnit.SECONDS);
|
||||
|
||||
log.error("几个意思,得到的值是啥:{}",cacheIndexWarn);
|
||||
log.error("几个意思,得到的值是啥:{}", cacheIndexWarn);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue