feat 指标预警功能

master
rouchen 2024-06-30 18:37:52 +08:00
parent e4bcdeefe2
commit db634c4e30
26 changed files with 707 additions and 229 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -10,6 +10,7 @@ public class IotDbApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(IotDbApplication.class, args); SpringApplication.run(IotDbApplication.class, args);
} }
} }

View File

@ -16,17 +16,17 @@ import java.util.List;
* Date 2024/6/20 16:47 * Date 2024/6/20 16:47
*/ */
@Component //@Component
public class EventRunner implements Runnable { //public class EventRunner implements Runnable {
@Autowired // @Autowired
private EventService eventService; // private EventService eventService;
@PostConstruct // @PostConstruct
public void run() { // public void run() {
List<VehicleEvent> vehicleEvents = eventService.selectVehicleEvent(); // List<VehicleEvent> vehicleEvents = eventService.selectVehicleEvent();
vehicleEvents.forEach( // vehicleEvents.forEach(
vehicleEvent -> { // vehicleEvent -> {
eventService.selectEvent(vehicleEvent); // eventService.selectEvent(vehicleEvent);
} // }
); // );
} // }
} //}

View File

@ -1,35 +0,0 @@
package com.muyu.event;
import com.muyu.kafka.SimpleKafkaConsumer;
import lombok.extern.log4j.Log4j2;
import org.checkerframework.checker.units.qual.C;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
/**
* kafka KafkaRunner
*
* @author Yangle
* Date 2024/6/20 21:42
*/
@Component
@Log4j2
public class KafkaRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
try {
log.info("项目启动开始动态启动kafka消费者");
}catch (Exception e){
log.error("启动kafka消费者失败异常信息:{}",e);
}
}
}

View File

@ -7,6 +7,7 @@ import com.muyu.mqtt.dao.MessageData;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.List;
@ -50,4 +51,9 @@ public class EventController {
return eventService.getMaximumCoordinate(vehicleEvent.getVin()); return eventService.getMaximumCoordinate(vehicleEvent.getVin());
} }
@PostMapping("/selectCarVin")
public VehicleEvent selectCarVin(@RequestParam String vin) {
return eventService.selectCarVin(vin);
}
} }

View File

@ -19,4 +19,6 @@ public interface EventMapper {
List<VehicleEvent> selectVehicleEvent(); List<VehicleEvent> selectVehicleEvent();
VehicleEvent selectCarVin(String vin);
} }

View File

@ -26,4 +26,5 @@ public interface EventService {
List<VehicleEvent> selectVehicleEvent(); List<VehicleEvent> selectVehicleEvent();
VehicleEvent selectCarVin(String vin);
} }

View File

@ -56,110 +56,112 @@ public class EventServiceImpl implements EventService {
@Override @Override
public Result<MessageData> selectEvent(VehicleEvent vehicleEvent) { public Result<MessageData> selectEvent(VehicleEvent vehicleEvent) {
if (vehicleEvent.getEvent() != null) { // if (vehicleEvent.getEvent() != null) {
//
if (vehicleEvent.getEvent().contains("1")) { // if (vehicleEvent.getEvent().contains("1")) {
KafkaConsumer<String, String> consumer = getStringStringKafkaConsumer(); // KafkaConsumer<String, String> consumer = getStringStringKafkaConsumer();
// 订阅主题 // // 订阅主题
consumer.subscribe(Collections.singletonList("test1")); // consumer.subscribe(Collections.singletonList("test1"));
// 创建新线程来处理消息 // // 创建新线程来处理消息
new Thread(() -> { // new Thread(() -> {
while (true) { // while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) { // for (ConsumerRecord<String, String> record : records) {
// 处理消息 // // 处理消息
String message = record.value(); // String message = record.value();
String jsonString = JSON.toJSONString(message); // String jsonString = JSON.toJSONString(message);
log.info("接收到消息:" + jsonString); // log.info("接收到消息:" + jsonString);
MessageData messageData = JSON.parseObject(message, MessageData.class); // MessageData messageData = JSON.parseObject(message, MessageData.class);
// 将毫秒级时间戳转换为LocalDateTime // // 将毫秒级时间戳转换为LocalDateTime
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(messageData.getTimestamp())), ZoneId.systemDefault()); // LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(messageData.getTimestamp())), ZoneId.systemDefault());
//
// 格式化输出日期时间 // // 格式化输出日期时间
Date formattedTime = Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()); // Date formattedTime = Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
//判断车辆状态 // //判断车辆状态
vehicleStatus(messageData, formattedTime); // vehicleStatus(messageData, formattedTime);
//
//判断充电状态 // //判断充电状态
chargeStatus(messageData, formattedTime); // chargeStatus(messageData, formattedTime);
//
//判断运行状态 // //判断运行状态
runStatus(messageData, formattedTime); // runStatus(messageData, formattedTime);
//
//判断SOC // //判断SOC
soc(messageData, formattedTime); // soc(messageData, formattedTime);
//
//判断充电工作状态 // //判断充电工作状态
chargeWorkStatus(messageData, formattedTime); // chargeWorkStatus(messageData, formattedTime);
//
//判断驱动电机状态 // //判断驱动电机状态
driveMotorStatus(messageData, formattedTime); // driveMotorStatus(messageData, formattedTime);
//
//判断位置 // //判断位置
location(messageData, formattedTime); // location(messageData, formattedTime);
//
//判断EAS // //判断EAS
eas(messageData, formattedTime); // eas(messageData, formattedTime);
//
//判断PTC // //判断PTC
ptc(messageData, formattedTime); // ptc(messageData, formattedTime);
//
//判断EPS // //判断EPS
eps(messageData, formattedTime); // eps(messageData, formattedTime);
//
//判断ABS // //判断ABS
abs(messageData, formattedTime); // abs(messageData, formattedTime);
//
//判断MCU // //判断MCU
mcu(messageData, formattedTime); // mcu(messageData, formattedTime);
//
//判断动力电池加热 // //判断动力电池加热
powerBatteryHeating(messageData, formattedTime); // powerBatteryHeating(messageData, formattedTime);
//
//判断动力电池电流 // //判断动力电池电流
powerBatteryCurrentStatus(messageData, formattedTime); // powerBatteryCurrentStatus(messageData, formattedTime);
//
//判断动力电池保温 // //判断动力电池保温
powerBatteryHeat(messageData, formattedTime); // powerBatteryHeat(messageData, formattedTime);
//
//判断DCDC // //判断DCDC
dcdc(messageData, formattedTime); // dcdc(messageData, formattedTime);
//
//判断CHG // //判断CHG
chg(messageData, formattedTime); // chg(messageData, formattedTime);
//
} // }
} // }
}).start(); // }).start();
} // }
if (vehicleEvent.getEvent().contains("2")) { // if (vehicleEvent.getEvent().contains("2")) {
if(vehicleEvent.getStates()){ // if(vehicleEvent.getStates()){
if (redisTemplate.hasKey(vehicleEvent.getVin())){ // if (redisTemplate.hasKey(vehicleEvent.getVin())){
KafkaConsumer<String, String> consumer = getStringStringKafkaConsumer(); // KafkaConsumer<String, String> consumer = getStringStringKafkaConsumer();
// 订阅主题 // // 订阅主题
consumer.subscribe(Collections.singletonList("test1")); // consumer.subscribe(Collections.singletonList("test1"));
// 创建新线程来处理消息 // // 创建新线程来处理消息
new Thread(() -> { // new Thread(() -> {
while (true) { // while (true) {
// 发送拉取请求 // // 发送拉取请求
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 设置拉取超时时间为100毫秒 // ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 设置拉取超时时间为100毫秒
// 处理消息 // // 处理消息
for (ConsumerRecord<String, String> record : records) { // for (ConsumerRecord<String, String> record : records) {
redisTemplate.opsForValue().set(vehicleEvent.getVin(),record.value()); // redisTemplate.opsForValue().set(vehicleEvent.getVin(),record.value());
System.out.println("Received message: " + record.value()); // System.out.println("Received message: " + record.value());
// 在这里添加消息处理逻辑 // // 在这里添加消息处理逻辑
} // }
} // }
}).start(); // }).start();
} // }
//
} // }
return Result.success(); // return Result.success();
} // }
if (vehicleEvent.getEvent().contains("3")) { // if (vehicleEvent.getEvent().contains("3")) {
// new Thread(() -> {
} //
} // }).start();
// }
// }
return Result.error("车辆没有绑定事件"); return Result.error("车辆没有绑定事件");
@ -207,6 +209,11 @@ public class EventServiceImpl implements EventService {
return eventMapper.selectVehicleEvent(); return eventMapper.selectVehicleEvent();
} }
@Override
public VehicleEvent selectCarVin(String vin) {
return eventMapper.selectCarVin(vin);
}
// //创建分区 // //创建分区
// NewTopic newTopic = new NewTopic(topic, 8, (short) 1); // NewTopic newTopic = new NewTopic(topic, 8, (short) 1);

View File

@ -29,7 +29,7 @@ import java.util.List;
public class IotDBSessionConfig { public class IotDBSessionConfig {
private static Session session; private static Session session;
private static final String LOCAL_HOST = "47.93.162.81"; private static final String LOCAL_HOST = "127.0.0.1";
@Bean @Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException { public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
@ -78,6 +78,12 @@ public class IotDBSessionConfig {
public void insertRecord(String deviceId, Long time, List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { public void insertRecord(String deviceId, Long time, List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
session.insertRecord(deviceId, time, measurementsList, valuesList); session.insertRecord(deviceId, time, measurementsList, valuesList);
} }
/**
* description: SQL
* author: zhouhong
*/
public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
return session.executeQueryStatement(sql);
}
} }

View File

@ -0,0 +1,42 @@
package com.muyu.iotdb.controller;
import com.muyu.iotdb.config.IotDBSessionConfig;
import com.muyu.iotdb.model.VehicleData;
import com.muyu.iotdb.model.param.IotDbParam;
import com.muyu.iotdb.model.result.IotDbResult;
import com.muyu.iotdb.respone.ResponseData;
import com.muyu.iotdb.server.IotDbServer;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.List;
/**
* description: iotdb
* date: 2022/8/15 21:50
* author: zhouhong
*/
@Log4j2
@RestController
public class IotDbController {
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
/**
*
* @param
*/
@PostMapping("/api/device/queryData")
public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
return ResponseData.success(iotDbServer.queryDataFromIotDb (iotDbParam));
}
}

View File

@ -1,17 +1,28 @@
package com.muyu.iotdb.model.param; package com.muyu.iotdb.model.param;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
/** /**
* description: * description:
* date: 2022/8/15 21:53 * date: 2022/8/15 21:53
* author: YangLe * author: YangLe
*/ */
@Data @Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IotDbParam { public class IotDbParam {
/*
* vin
*/
private String vin; private String vin;
/***
*
*/
private String queryAttribute;
/*** /***
* *
*/ */
@ -21,4 +32,6 @@ public class IotDbParam {
*/ */
private String endTime; private String endTime;
} }

View File

@ -0,0 +1,26 @@
package com.muyu.iotdb.model.result;
import lombok.Data;
/**
* VehicleDataResult
*
* @author Yangle
* Date 2024/6/29 11:49
*/
@Data
public class VehicleResult {
/***
*
*/
private String time;
/***
* vin
*/
private String vin;
/***
*
*/
private String data;
}

View File

@ -3,12 +3,14 @@ package com.muyu.iotdb.server;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.iotdb.model.VehicleData; import com.muyu.iotdb.model.param.IotDbParam;
import com.muyu.iotdb.model.result.VehicleResult;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.rmi.ServerException; import java.rmi.ServerException;
import java.util.List;
/** /**
* description: iot * description: iot
@ -22,4 +24,10 @@ public interface IotDbServer {
*/ */
void insertData(JSONObject vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException; void insertData(JSONObject vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException;
/**
*
*/
List<VehicleResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
} }

View File

@ -4,7 +4,8 @@ package com.muyu.iotdb.server.impl;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.iotdb.config.IotDBSessionConfig; import com.muyu.iotdb.config.IotDBSessionConfig;
import com.muyu.iotdb.model.VehicleData; import com.muyu.iotdb.model.param.IotDbParam;
import com.muyu.iotdb.model.result.VehicleResult;
import com.muyu.iotdb.server.IotDbServer; import com.muyu.iotdb.server.IotDbServer;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
@ -38,16 +39,85 @@ public class IotDbServerImpl implements IotDbServer {
public void insertData(JSONObject vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException { public void insertData(JSONObject vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException {
// iotDbParam: 模拟设备上报消息 // iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码 // bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.test." + vehicleData.getString("vin"); Object o = vehicleData.get("vinCode");
String deviceId = "root.ln.bizkey." + o;
// 将设备上报的数据存入数据库(时序数据库) // 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>(); List<String> measurementsList = new ArrayList<>();
measurementsList.add("vehicleStatusMsg");
List<String> valuesList = new ArrayList<>(); List<String> valuesList = new ArrayList<>();
valuesList.add(vehicleData.toString()); vehicleData.forEach(
log.info("vehicleData:{}", vehicleData); (k, v) -> {
measurementsList.add(k);
valuesList.add(v.toString());
}
);
log.info("measurementsList:{}", measurementsList);
log.info("valuesList:{}",valuesList); log.info("valuesList:{}",valuesList);
iotDBSessionConfig.insertRecord(deviceId, vehicleData.getLong("timestamp"), measurementsList, valuesList); iotDBSessionConfig.insertRecord(deviceId, vehicleData.getLong("timestamp1"), measurementsList, valuesList);
}
@Override
public List<VehicleResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
List<VehicleResult> vehicleResultList = new ArrayList<>();
if (null != iotDbParam.getVin()) {
String sql = "select * from root.ln.bizkey."+ iotDbParam.getVin();
if (null != iotDbParam.getStartTime() && !"".equals(iotDbParam.getStartTime())){
if (sql.contains("where")){
sql += " and time >= " + iotDbParam.getStartTime();
}else{
sql += " where time >= " + iotDbParam.getStartTime();
}
}
if (null != iotDbParam.getEndTime() && !"".equals(iotDbParam.getEndTime())){
if (sql.contains("where")){
sql += " and time <= " + iotDbParam.getEndTime();
}else{
sql += " where time <= " + iotDbParam.getEndTime();
}
}
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(iotDbParam, vehicleResultList, sessionDataSet, titleList);
} else {
log.info("PK或者SN不能为空");
}
return vehicleResultList;
} }
/**
*
*/
private void packagingData(IotDbParam iotDbParam, List<VehicleResult> vehicleResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
VehicleResult vehicleResult = new VehicleResult();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
if (null != field.getDataType()){
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
}
vehicleResult.setTime(timeString);
vehicleResult.setVin(iotDbParam.getVin());
vehicleResult.setData(map.get(iotDbParam.getQueryAttribute()));
vehicleResultList.add(vehicleResult);
}
}
}
} }

View File

@ -6,6 +6,8 @@ import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.mqtt.dao.MessageDa; import com.muyu.mqtt.dao.MessageDa;
import com.muyu.redis.demo.Sliding;
import com.muyu.springListen.SlidingWindow;
import com.muyu.utils.ConversionUtil; import com.muyu.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.IoTDBConnectionException;
@ -17,11 +19,13 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.rmi.ServerException; import java.rmi.ServerException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -65,57 +69,121 @@ public class MessageCallbackService implements MqttCallback {
String s1 = ConversionUtil.hexStringToString(s); String s1 = ConversionUtil.hexStringToString(s);
JSONObject object = getObject(s1); JSONObject object = getObject(s1);
log.error("object:{}", object); log.error("object:{}", object);
String vin ="";
String key = vin; // 使用vin作为key如果适用
String value = com.alibaba.fastjson.JSON.toJSONString(object); String value = com.alibaba.fastjson.JSON.toJSONString(object);
JSONObject jsonObject = JSONObject.parseObject(value);
String vin = jsonObject.getString("vinCode");
String key = vin; // 使用vin作为key如果适用
log.error("value:{}", value); log.error("value:{}", value);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value);
kafkaTemplate.send(producerRecord); kafkaTemplate.send(producerRecord);
String s2 = redisTemplate.opsForValue().get(key + "-length");
log.error("s2:{}",JSON.toJSONString(s2));
publishEvent(s2);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@Resource
ApplicationEventPublisher applicationEventPublisher;
public void publishEvent(String message) {
Sliding sliding = JSONObject.parseObject(message, Sliding.class);
log.error("slidingWindow:{}", sliding);
System.out.println("开始发布自定义事件");
SlidingWindow customApplicationEvent = new SlidingWindow(this,sliding.getAnalyzeKey(),sliding.getVehicleVin(),sliding.getSlideLengthId(),sliding.getWindowRadiusId(),sliding.getMessageLabel(),sliding.getStandard());
// 发布事件
applicationEventPublisher.publishEvent(customApplicationEvent);
System.out.println("发布自定义事件结束");
}
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete()); System.out.println("deliveryComplete---------" + token.isComplete());
} }
public com.alibaba.fastjson2.JSONObject getObject(String message) { // public com.alibaba.fastjson2.JSONObject getObject(String message) {
// log.info("原始数据:{}", message);
// // 移除字符串message的首尾字符为后续处理准备
// message = message.substring(1, message.length() - 2);
// // 初始化StringBuilder用于构建最终的JSON字符串
// StringBuffer buffer = new StringBuffer();
//
// // 从Redis中获取指定key的值这里的key是message的前17个字符
// Object o = redisTemplate.opsForHash().get("VIN", message.substring(0, 17));
// log.error("o:{}", o);
// // 将获取到的值转换为VehicleDateModel的列表
// List<MessageDa> messageDas = new ArrayList<>();
// messageDas = JSON.parseObject(o.toString(), ArrayList.class).stream()
// .map(obj -> JSON.parseObject(obj.toString(), MessageDa.class)).toList();
// // 用于构建最终JSON字符串的变量
// String finalMessage = message;
// // 遍历车辆数据模型列表构建JSON字符串的键值对
// messageDas.forEach(messageDa -> {
// // 根据车辆数据模型的起始和结束位置以及特定的处理逻辑构建JSON字符串的键值对并追加到buffer中
// buffer.append(",\"" + messageDa.getAnalyzeKey() + "\":" +
// "\"" + remove0(finalMessage.substring(messageDa.getAnalyzeStart() , messageDa.getEnt())) + "\"");
// });
// // 构建最终的JSON字符串
// String jsonString = "{" + buffer.substring(1) + "}";
// com.alibaba.fastjson2.JSONObject jsonObject = JSON.parseObject(jsonString);
//
// // 日志记录解析后的数据
// log.info("解析后的数据:{}", jsonString);
// // 返回构建好的JSONObject
// return new com.alibaba.fastjson2.JSONObject(jsonObject);
// }
public JSONObject getObject(String message) {
log.info("原始数据:{}", message); log.info("原始数据:{}", message);
// 移除字符串message的首尾字符为后续处理准备 // 移除字符串message的首尾字符为后续处理准备
message = message.substring(1, message.length() - 2); message = message.substring(1, message.length() - 2);
// 初始化StringBuilder用于构建最终的JSON字符串
// 从本地缓存中获取数据
String cacheKey = message.substring(0, 17); // 构造一个缓存键
String cachedData = localCache.getIfPresent(cacheKey);
if (cachedData == null) {
// 本地缓存中没有找到从Redis中获取
String dataFromRedis = redisTemplate.opsForHash().get("VIN", message.substring(0, 17)).toString();
log.error("Data from Redis: {}", dataFromRedis);
if (dataFromRedis != null) {
// 将Redis中的数据放入本地缓存
localCache.put(cacheKey, dataFromRedis);
log.info("数据放入本地缓存:{}", dataFromRedis);
cachedData = dataFromRedis;
}
} else {
log.info("数据在本地缓存中找到:{}", cachedData);
}
StringBuffer buffer = new StringBuffer(); StringBuffer buffer = new StringBuffer();
// 从Redis中获取指定key的值这里的key是message的前17个字符
Object o = redisTemplate.opsForHash().get("VIN", message.substring(0, 17));
// 将获取到的值转换为VehicleDateModel的列表
List<MessageDa> messageDas = new ArrayList<>(); List<MessageDa> messageDas = new ArrayList<>();
messageDas = JSON.parseObject(o.toString(), ArrayList.class).stream() if (cachedData != null) {
.map(obj -> JSON.parseObject(obj.toString(), MessageDa.class)).toList(); // 将获取到的值转换为VehicleDateModel的列表
// 用于构建最终JSON字符串的变量 messageDas = JSON.parseArray(cachedData, MessageDa.class);
String finalMessage = message; // 用于构建最终JSON字符串的变量
// 遍历车辆数据模型列表构建JSON字符串的键值对 String finalMessage = message;
messageDas.forEach(messageDa -> { // 遍历车辆数据模型列表构建JSON字符串的键值对
// 根据车辆数据模型的起始和结束位置以及特定的处理逻辑构建JSON字符串的键值对并追加到buffer中 messageDas.forEach(messageDa -> {
buffer.append(",\"" + messageDa.getAnalyzeKey() + "\":" + buffer.append(",\"" + messageDa.getAnalyzeKey() + "\":" +
"\"" + remove0(finalMessage.substring(messageDa.getAnalyzeStart() - 1, messageDa.getEnt())) + "\""); "\"" + remove0(finalMessage.substring(messageDa.getAnalyzeStart(), messageDa.getEnt())) + "\"");
}); });
}
// 构建最终的JSON字符串 // 构建最终的JSON字符串
String jsonString = "{" + buffer.substring(1) + "}"; String jsonString = "{" + buffer.substring(1) + "}";
com.alibaba.fastjson2.JSONObject jsonObject = JSON.parseObject(jsonString); JSONObject jsonObject = JSON.parseObject(jsonString);
try {
} catch (Exception e) {
throw new RuntimeException(e);
}
// 日志记录解析后的数据 // 日志记录解析后的数据
log.info("解析后的数据:{}", jsonString); log.info("解析后的数据:{}", jsonString);
// 返回构建好的JSONObject // 返回构建好的JSONObject
return new com.alibaba.fastjson2.JSONObject(jsonObject); return jsonObject;
} }
// 移除字符串中的前导零 // 移除字符串中的前导零
public String remove0(String str) { public String remove0(String str) {
if (str.length() > 1) { if (str.length() > 1) {
@ -129,12 +197,18 @@ public class MessageCallbackService implements MqttCallback {
} }
} }
@RabbitListener(queues = "subscription") @RabbitListener(queues ="renew")
public void receiveSms(Message message) { public void receiveSms(Message message) {
// 拿到消息中的VIN // 拿到消息中的VIN
String vin = message.getBody().toString(); byte[] body = message.getBody();
String vin = new String(body);
log.error("接收到消息",body);
log.error("vin:{}",vin);
// redisTemplate.delete(vin);
// 删除本地缓存 // 删除本地缓存
localCache.invalidate(vin); //// redisTemplate.delete(vin);
localCache.invalidate(vin);
log.error("删除本地缓存"+vin);
} }
/** /**

View File

@ -0,0 +1,27 @@
package com.muyu.redis.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* Sliding
*
* @author Yangle
* Date 2024/6/29 9:39
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class Sliding {
private String analyzeKey;
private String vehicleVin;
private String slideLengthId;
private String windowRadiusId;
private String messageLabel;
private String standard;
}

View File

@ -7,7 +7,9 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.mqtt.dao.MessageDa; import com.muyu.mqtt.dao.MessageDa;
import com.muyu.redis.demo.RedisData; import com.muyu.redis.demo.RedisData;
import com.muyu.redis.demo.Sliding;
import com.muyu.redis.service.RedisService; import com.muyu.redis.service.RedisService;
import com.muyu.springListen.SlidingWindow;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -30,49 +32,22 @@ public class RedisServiceImpl implements RedisService {
@Override @Override
public void set(RedisData redisData) { public void set(RedisData redisData) {
// Sliding sliding = new Sliding();
// sliding.setVin("KVMVI1BD0Z3X5QKVJ");
// sliding.setCar("20");
// sliding.setIndex("20");
// sliding.setLengthInterval("3/2");
// sliding.setLengthTime(5); // 假设lengthTime是一个整数
// redisTemplate.opsForValue().set(redisData.getVin()+"-length", JSON.toJSONString(sliding));
List<MessageDa> messageDas = new ArrayList<>();
messageDas.add(
new MessageDa(
1,
"vin",
0,
17,
"vIN",
"基础信息"
)
);
messageDas.add(
new MessageDa(
2,
"timestamp",
17,
30,
"时间戳",
"基础信息"
)
);
messageDas.add(
new MessageDa(
3,
"longitude",
38,
41,
"经度",
"基础信息"
)
);
redisTemplate.opsForList().set(redisData.getVin(), 0, JSON.toJSONString(messageDas));
} }
@Override @Override
public String get(RedisData redisData) { public String get(RedisData redisData) {
String string = redisTemplate.opsForValue().get(redisData.getVin()); String string = redisTemplate.opsForValue().get(redisData.getVin()+"-length");
log.error("string:{}", string); log.error("string:{}", string);
return null; return string;
// String index = redisTemplate.opsForList().index(redisData.getVin(), 0); // String index = redisTemplate.opsForList().index(redisData.getVin(), 0);
// log.error("index:{}",index); // log.error("index:{}",index);
// // 解析JSON字符串 // // 解析JSON字符串

View File

@ -0,0 +1,130 @@
package com.muyu.springListen;
import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson2.JSON;
import com.muyu.event.common.VehicleEvent;
import com.muyu.event.service.EventService;
import com.muyu.iotdb.model.param.IotDbParam;
import com.muyu.iotdb.model.result.VehicleResult;
import com.muyu.iotdb.server.IotDbServer;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
@Log4j2
@Component
public class AsynCustomApplicationListener {
@Autowired
private EventService eventService;
@Autowired
private ScheduledExecutorService sharedExecutorService; // 使用共享的ScheduledExecutorService
@Autowired
private IotDbServer iotDbServer;
@Autowired
private RabbitTemplate rabbitTemplate;
@Async
@EventListener(SlidingWindow.class)
public void asyncListener(SlidingWindow slidingWindow) {
log.info("异步线程监听到事件:{}", slidingWindow);
List<VehicleEvent> vehicleEvents = eventService.selectVehicleEvent();
for (VehicleEvent vehicleEvent : vehicleEvents) {
if (vehicleEvent.getVin().equals(slidingWindow.vehicleVin())) {
vehicleEvent.setStates(true);
// 使用共享的ScheduledExecutorService调度任务
Float lengthTime = Float.valueOf(slidingWindow.slideLengthId());
int lengthInterval = Integer.parseInt(slidingWindow.windowRadiusId());
//每隔多久查一次 reslut秒
int result = (int) (lengthTime * lengthInterval);
sharedExecutorService.schedule(() -> {
log.error("走到这里啦");
Date date = new Date();
long time = date.getTime();
time = time - (time % 1000);
long currentTimeMillis = System.currentTimeMillis();
// 将毫秒转换为秒
long currentTimeInSeconds = currentTimeMillis / 1000;
//开始时间
long l = currentTimeInSeconds - Long.valueOf(result);
//转成毫秒
l = l * 1000;
Long s = Long.valueOf(slidingWindow.windowRadiusId());
//现在时间的毫秒
long currentTimeInMillis = System.currentTimeMillis();
// if (currentTimeInMillis-l==s){
// try {
// sleep(s);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }else {
//
// }
try {
List<VehicleResult> vehicleResults = iotDbServer.queryDataFromIotDb(
IotDbParam.builder()
.endTime(String.valueOf(currentTimeInMillis))
.vin(vehicleEvent.getVin())
.queryAttribute(slidingWindow.analyzeKey())
.startTime( String.valueOf(l))
.build()
);
float sum=0;
for (VehicleResult vehicleResult : vehicleResults) {
String data = vehicleResult.getData();
sum+=Float.valueOf(data);
}
sum=sum/vehicleResults.size();
if (sum>Float.valueOf(slidingWindow.standard())){
String string = new StringBuilder().append(slidingWindow.vehicleVin()).append("上升").append(new Date().getTime()).toString();
rabbitTemplate.convertAndSend("SlidingWindow",string,message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
log.error(slidingWindow.vehicleVin()+"上升");
}
if (sum<Float.valueOf(slidingWindow.standard())){
log.error(slidingWindow.vehicleVin()+"下降");
String string = new StringBuilder().append(slidingWindow.vehicleVin()).append("下降").append(new Date().getTime()).toString();
rabbitTemplate.convertAndSend("SlidingWindow",string,message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
}
log.error("vehicleResults:{}",vehicleResults);
// if ( sum<Float.valueOf(slidingWindow.standard()) && sum-Float.valueOf(slidingWindow.standard()) <0 ){
// log.error(slidingWindow.vehicleVin()+"波动");
// }
// if ( sum>Float.valueOf(slidingWindow.standard()) && sum-Float.valueOf(slidingWindow.standard()) >0 ){
// log.error(slidingWindow.vehicleVin()+"波动");
// }
} catch (Exception e) {
throw new RuntimeException(e);
}
// iotDbServer.queryDataFromIotDb(new )
}, Long.parseLong(slidingWindow.windowRadiusId()), TimeUnit.SECONDS);
}
}
}
}

View File

@ -0,0 +1,37 @@
package com.muyu.springListen;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线 AsyncTaskExecutorConfig
*
* @author Yangle
* Date 2024/6/25 19:59
*/
@EnableAsync
@Configuration
public class AsyncTaskExecutorConfig {
@Bean
public AsyncTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,14 @@
package com.muyu.springListen;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
public class SlidingApplicationListener implements ApplicationListener<SlidingWindow> {
@Override
public void onApplicationEvent(SlidingWindow event) {
System.out.println("onApplicationEvent方法接收到的消息:{} "+event.messageLabel()+event.vehicleVin()+event.slideLengthId()+event.windowRadiusId()+event.analyzeKey()+event.getSource()+event.getTimestamp());
}
}

View File

@ -0,0 +1,55 @@
package com.muyu.springListen;
import org.checkerframework.checker.units.qual.A;
import org.springframework.context.ApplicationEvent;
/**
* SlidingWindow
*
* @author Yangle
* Date 2024/6/28 22:33
*/
public class SlidingWindow extends ApplicationEvent {
private String analyzeKey;
private String vehicleVin;
private String slideLengthId;
private String windowRadiusId;
private String messageLabel;
private String standard;
public SlidingWindow(Object source,String analyzeKey,String vehicleVin,String slideLengthId,String windowRadiusId,String messageLabel,String standard) {
super(source);
this.analyzeKey=analyzeKey;
this.vehicleVin=vehicleVin;
this.slideLengthId=slideLengthId;
this.windowRadiusId=windowRadiusId;
this.messageLabel=messageLabel;
this.standard=standard;
}
public String messageLabel() {
return messageLabel;
}
public String standard() {
return standard;
}
public String analyzeKey() {
return analyzeKey;
}
public String vehicleVin() {
return vehicleVin;
}
public String slideLengthId() {
return slideLengthId;
}
public String windowRadiusId() {
return windowRadiusId;
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.springListen;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Configuration
public class ThreadPoolConfig {
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(5); // 根据实际情况调整线程数
}
}

View File

@ -9,5 +9,8 @@
<select id="selectVehicleEvent" resultType="com.muyu.event.common.VehicleEvent"> <select id="selectVehicleEvent" resultType="com.muyu.event.common.VehicleEvent">
select * from vehicle_event select * from vehicle_event
</select> </select>
<select id="selectCarVin" resultType="com.muyu.event.common.VehicleEvent">
select * from vehicle_event where vin = #{vin}
</select>
</mapper> </mapper>