diff --git a/logs/vehicle.log b/logs/vehicle.log index 02c0453..bd80edd 100644 Binary files a/logs/vehicle.log and b/logs/vehicle.log differ diff --git a/logs/vehicle.log.2024-06-21.0.gz b/logs/vehicle.log.2024-06-21.0.gz deleted file mode 100644 index 0188ef2..0000000 Binary files a/logs/vehicle.log.2024-06-21.0.gz and /dev/null differ diff --git a/logs/vehicle.log.2024-06-28.1.gz b/logs/vehicle.log.2024-06-28.1.gz new file mode 100644 index 0000000..ca793a1 Binary files /dev/null and b/logs/vehicle.log.2024-06-28.1.gz differ diff --git a/logs/vehicle.log.2024-06-29.0.gz b/logs/vehicle.log.2024-06-29.0.gz new file mode 100644 index 0000000..ee8808b Binary files /dev/null and b/logs/vehicle.log.2024-06-29.0.gz differ diff --git a/src/main/java/com/muyu/IotDbApplication.java b/src/main/java/com/muyu/IotDbApplication.java index 86a3731..23fdea3 100644 --- a/src/main/java/com/muyu/IotDbApplication.java +++ b/src/main/java/com/muyu/IotDbApplication.java @@ -10,6 +10,7 @@ public class IotDbApplication { public static void main(String[] args) { SpringApplication.run(IotDbApplication.class, args); + } } diff --git a/src/main/java/com/muyu/event/EventRunner.java b/src/main/java/com/muyu/event/EventRunner.java index 0b6f7fa..461a081 100644 --- a/src/main/java/com/muyu/event/EventRunner.java +++ b/src/main/java/com/muyu/event/EventRunner.java @@ -16,17 +16,17 @@ import java.util.List; * Date 2024/6/20 16:47 */ -@Component -public class EventRunner implements Runnable { - @Autowired - private EventService eventService; - @PostConstruct - public void run() { - List vehicleEvents = eventService.selectVehicleEvent(); - vehicleEvents.forEach( - vehicleEvent -> { - eventService.selectEvent(vehicleEvent); - } - ); - } -} +//@Component +//public class EventRunner implements Runnable { +// @Autowired +// private EventService eventService; +// @PostConstruct +// public void run() { +// List vehicleEvents = eventService.selectVehicleEvent(); +// vehicleEvents.forEach( +// vehicleEvent -> { +// eventService.selectEvent(vehicleEvent); +// } +// ); +// } +//} diff --git a/src/main/java/com/muyu/event/KafkaRunner.java b/src/main/java/com/muyu/event/KafkaRunner.java deleted file mode 100644 index 637faf6..0000000 --- a/src/main/java/com/muyu/event/KafkaRunner.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.muyu.event; - -import com.muyu.kafka.SimpleKafkaConsumer; -import lombok.extern.log4j.Log4j2; -import org.checkerframework.checker.units.qual.C; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Component; - -import lombok.extern.log4j.Log4j2; -import org.springframework.stereotype.Component; - -/** - * kafka消费者启动类 KafkaRunner - * - * @author Yangle - * Date 2024/6/20 21:42 - */ -@Component -@Log4j2 -public class KafkaRunner implements ApplicationRunner { - - @Override - public void run(ApplicationArguments args) throws Exception { - - try { - log.info("项目启动,开始动态启动kafka消费者"); - - }catch (Exception e){ - log.error("启动kafka消费者失败,异常信息:{}",e); - } - - } - -} diff --git a/src/main/java/com/muyu/event/controller/EventController.java b/src/main/java/com/muyu/event/controller/EventController.java index 68d64a4..b1c9a27 100644 --- a/src/main/java/com/muyu/event/controller/EventController.java +++ b/src/main/java/com/muyu/event/controller/EventController.java @@ -7,6 +7,7 @@ import com.muyu.mqtt.dao.MessageData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.List; @@ -50,4 +51,9 @@ public class EventController { return eventService.getMaximumCoordinate(vehicleEvent.getVin()); } + + @PostMapping("/selectCarVin") + public VehicleEvent selectCarVin(@RequestParam String vin) { + return eventService.selectCarVin(vin); + } } diff --git a/src/main/java/com/muyu/event/mapper/EventMapper.java b/src/main/java/com/muyu/event/mapper/EventMapper.java index 4230b61..a633f78 100644 --- a/src/main/java/com/muyu/event/mapper/EventMapper.java +++ b/src/main/java/com/muyu/event/mapper/EventMapper.java @@ -19,4 +19,6 @@ public interface EventMapper { List selectVehicleEvent(); + + VehicleEvent selectCarVin(String vin); } diff --git a/src/main/java/com/muyu/event/service/EventService.java b/src/main/java/com/muyu/event/service/EventService.java index edaa999..b14c73a 100644 --- a/src/main/java/com/muyu/event/service/EventService.java +++ b/src/main/java/com/muyu/event/service/EventService.java @@ -26,4 +26,5 @@ public interface EventService { List selectVehicleEvent(); + VehicleEvent selectCarVin(String vin); } diff --git a/src/main/java/com/muyu/event/service/impl/EventServiceImpl.java b/src/main/java/com/muyu/event/service/impl/EventServiceImpl.java index 552f2a5..163a65d 100644 --- a/src/main/java/com/muyu/event/service/impl/EventServiceImpl.java +++ b/src/main/java/com/muyu/event/service/impl/EventServiceImpl.java @@ -56,110 +56,112 @@ public class EventServiceImpl implements EventService { @Override public Result selectEvent(VehicleEvent vehicleEvent) { - if (vehicleEvent.getEvent() != null) { - - if (vehicleEvent.getEvent().contains("1")) { - KafkaConsumer consumer = getStringStringKafkaConsumer(); - // 订阅主题 - consumer.subscribe(Collections.singletonList("test1")); - // 创建新线程来处理消息 - new Thread(() -> { - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - // 处理消息 - String message = record.value(); - String jsonString = JSON.toJSONString(message); - log.info("接收到消息:" + jsonString); - MessageData messageData = JSON.parseObject(message, MessageData.class); - // 将毫秒级时间戳转换为LocalDateTime - LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(messageData.getTimestamp())), ZoneId.systemDefault()); - - // 格式化输出日期时间 - Date formattedTime = Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()); - //判断车辆状态 - vehicleStatus(messageData, formattedTime); - - //判断充电状态 - chargeStatus(messageData, formattedTime); - - //判断运行状态 - runStatus(messageData, formattedTime); - - //判断SOC - soc(messageData, formattedTime); - - //判断充电工作状态 - chargeWorkStatus(messageData, formattedTime); - - //判断驱动电机状态 - driveMotorStatus(messageData, formattedTime); - - //判断位置 - location(messageData, formattedTime); - - //判断EAS - eas(messageData, formattedTime); - - //判断PTC - ptc(messageData, formattedTime); - - //判断EPS - eps(messageData, formattedTime); - - //判断ABS - abs(messageData, formattedTime); - - //判断MCU - mcu(messageData, formattedTime); - - //判断动力电池加热 - powerBatteryHeating(messageData, formattedTime); - - //判断动力电池电流 - powerBatteryCurrentStatus(messageData, formattedTime); - - //判断动力电池保温 - powerBatteryHeat(messageData, formattedTime); - - //判断DCDC - dcdc(messageData, formattedTime); - - //判断CHG - chg(messageData, formattedTime); - - } - } - }).start(); - } - if (vehicleEvent.getEvent().contains("2")) { - if(vehicleEvent.getStates()){ - if (redisTemplate.hasKey(vehicleEvent.getVin())){ - KafkaConsumer consumer = getStringStringKafkaConsumer(); - // 订阅主题 - consumer.subscribe(Collections.singletonList("test1")); - // 创建新线程来处理消息 - new Thread(() -> { - while (true) { - // 发送拉取请求 - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 设置拉取超时时间为100毫秒 - // 处理消息 - for (ConsumerRecord record : records) { - redisTemplate.opsForValue().set(vehicleEvent.getVin(),record.value()); - System.out.println("Received message: " + record.value()); - // 在这里添加消息处理逻辑 - } - } - }).start(); - } - - } - return Result.success(); - } - if (vehicleEvent.getEvent().contains("3")) { - - } - } +// if (vehicleEvent.getEvent() != null) { +// +// if (vehicleEvent.getEvent().contains("1")) { +// KafkaConsumer consumer = getStringStringKafkaConsumer(); +// // 订阅主题 +// consumer.subscribe(Collections.singletonList("test1")); +// // 创建新线程来处理消息 +// new Thread(() -> { +// while (true) { +// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); +// for (ConsumerRecord record : records) { +// // 处理消息 +// String message = record.value(); +// String jsonString = JSON.toJSONString(message); +// log.info("接收到消息:" + jsonString); +// MessageData messageData = JSON.parseObject(message, MessageData.class); +// // 将毫秒级时间戳转换为LocalDateTime +// LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(messageData.getTimestamp())), ZoneId.systemDefault()); +// +// // 格式化输出日期时间 +// Date formattedTime = Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()); +// //判断车辆状态 +// vehicleStatus(messageData, formattedTime); +// +// //判断充电状态 +// chargeStatus(messageData, formattedTime); +// +// //判断运行状态 +// runStatus(messageData, formattedTime); +// +// //判断SOC +// soc(messageData, formattedTime); +// +// //判断充电工作状态 +// chargeWorkStatus(messageData, formattedTime); +// +// //判断驱动电机状态 +// driveMotorStatus(messageData, formattedTime); +// +// //判断位置 +// location(messageData, formattedTime); +// +// //判断EAS +// eas(messageData, formattedTime); +// +// //判断PTC +// ptc(messageData, formattedTime); +// +// //判断EPS +// eps(messageData, formattedTime); +// +// //判断ABS +// abs(messageData, formattedTime); +// +// //判断MCU +// mcu(messageData, formattedTime); +// +// //判断动力电池加热 +// powerBatteryHeating(messageData, formattedTime); +// +// //判断动力电池电流 +// powerBatteryCurrentStatus(messageData, formattedTime); +// +// //判断动力电池保温 +// powerBatteryHeat(messageData, formattedTime); +// +// //判断DCDC +// dcdc(messageData, formattedTime); +// +// //判断CHG +// chg(messageData, formattedTime); +// +// } +// } +// }).start(); +// } +// if (vehicleEvent.getEvent().contains("2")) { +// if(vehicleEvent.getStates()){ +// if (redisTemplate.hasKey(vehicleEvent.getVin())){ +// KafkaConsumer consumer = getStringStringKafkaConsumer(); +// // 订阅主题 +// consumer.subscribe(Collections.singletonList("test1")); +// // 创建新线程来处理消息 +// new Thread(() -> { +// while (true) { +// // 发送拉取请求 +// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 设置拉取超时时间为100毫秒 +// // 处理消息 +// for (ConsumerRecord record : records) { +// redisTemplate.opsForValue().set(vehicleEvent.getVin(),record.value()); +// System.out.println("Received message: " + record.value()); +// // 在这里添加消息处理逻辑 +// } +// } +// }).start(); +// } +// +// } +// return Result.success(); +// } +// if (vehicleEvent.getEvent().contains("3")) { +// new Thread(() -> { +// +// }).start(); +// } +// } return Result.error("车辆没有绑定事件"); @@ -207,6 +209,11 @@ public class EventServiceImpl implements EventService { return eventMapper.selectVehicleEvent(); } + @Override + public VehicleEvent selectCarVin(String vin) { + return eventMapper.selectCarVin(vin); + } + // //创建分区 // NewTopic newTopic = new NewTopic(topic, 8, (short) 1); diff --git a/src/main/java/com/muyu/iotdb/config/IotDBSessionConfig.java b/src/main/java/com/muyu/iotdb/config/IotDBSessionConfig.java index 870503c..bfcfde2 100644 --- a/src/main/java/com/muyu/iotdb/config/IotDBSessionConfig.java +++ b/src/main/java/com/muyu/iotdb/config/IotDBSessionConfig.java @@ -29,7 +29,7 @@ import java.util.List; public class IotDBSessionConfig { private static Session session; - private static final String LOCAL_HOST = "47.93.162.81"; + private static final String LOCAL_HOST = "127.0.0.1"; @Bean public Session getSession() throws IoTDBConnectionException, StatementExecutionException { @@ -78,6 +78,12 @@ public class IotDBSessionConfig { public void insertRecord(String deviceId, Long time, List measurementsList, List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException { session.insertRecord(deviceId, time, measurementsList, valuesList); } - + /** + * description: 根据SQL查询 + * author: zhouhong + */ + public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException { + return session.executeQueryStatement(sql); + } } diff --git a/src/main/java/com/muyu/iotdb/controller/IotDbController.java b/src/main/java/com/muyu/iotdb/controller/IotDbController.java new file mode 100644 index 0000000..a24362f --- /dev/null +++ b/src/main/java/com/muyu/iotdb/controller/IotDbController.java @@ -0,0 +1,42 @@ +package com.muyu.iotdb.controller; + + +import com.muyu.iotdb.config.IotDBSessionConfig; +import com.muyu.iotdb.model.VehicleData; +import com.muyu.iotdb.model.param.IotDbParam; +import com.muyu.iotdb.model.result.IotDbResult; +import com.muyu.iotdb.respone.ResponseData; +import com.muyu.iotdb.server.IotDbServer; +import lombok.extern.log4j.Log4j2; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; +import java.rmi.ServerException; +import java.util.List; + +/** + * description: iotdb 控制层 + * date: 2022/8/15 21:50 + * author: zhouhong + */ +@Log4j2 +@RestController +public class IotDbController { + + @Resource + private IotDbServer iotDbServer; + @Resource + private IotDBSessionConfig iotDBSessionConfig; + + + /** + * 查询数据 + * @param + */ + @PostMapping("/api/device/queryData") + public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception { + return ResponseData.success(iotDbServer.queryDataFromIotDb (iotDbParam)); + } +} \ No newline at end of file diff --git a/src/main/java/com/muyu/iotdb/model/param/IotDbParam.java b/src/main/java/com/muyu/iotdb/model/param/IotDbParam.java index 2f846a9..54bf837 100644 --- a/src/main/java/com/muyu/iotdb/model/param/IotDbParam.java +++ b/src/main/java/com/muyu/iotdb/model/param/IotDbParam.java @@ -1,17 +1,28 @@ package com.muyu.iotdb.model.param; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; + /** * description: 入参 * date: 2022/8/15 21:53 * author: YangLe */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class IotDbParam { - - - - private String vin; + /* + * 车辆vin + */ + private String vin; + /*** + * 查询属性 + */ + private String queryAttribute; /*** * 查询开始时间 */ @@ -21,4 +32,6 @@ public class IotDbParam { */ private String endTime; + + } diff --git a/src/main/java/com/muyu/iotdb/model/result/VehicleResult.java b/src/main/java/com/muyu/iotdb/model/result/VehicleResult.java new file mode 100644 index 0000000..661fdd6 --- /dev/null +++ b/src/main/java/com/muyu/iotdb/model/result/VehicleResult.java @@ -0,0 +1,26 @@ +package com.muyu.iotdb.model.result; + +import lombok.Data; + +/** + * VehicleDataResult + * + * @author Yangle + * Date 2024/6/29 11:49 + */ +@Data +public class VehicleResult { + + /*** + * 时间 + */ + private String time; + /*** + * 车辆vin + */ + private String vin; + /*** + * 实时数据 + */ + private String data; +} diff --git a/src/main/java/com/muyu/iotdb/server/IotDbServer.java b/src/main/java/com/muyu/iotdb/server/IotDbServer.java index e0e43fb..2a9bc00 100644 --- a/src/main/java/com/muyu/iotdb/server/IotDbServer.java +++ b/src/main/java/com/muyu/iotdb/server/IotDbServer.java @@ -3,12 +3,14 @@ package com.muyu.iotdb.server; import com.alibaba.fastjson2.JSONObject; -import com.muyu.iotdb.model.VehicleData; +import com.muyu.iotdb.model.param.IotDbParam; +import com.muyu.iotdb.model.result.VehicleResult; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.springframework.stereotype.Service; import java.rmi.ServerException; +import java.util.List; /** * description: iot服务类 @@ -22,4 +24,10 @@ public interface IotDbServer { */ void insertData(JSONObject vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException; + /** + * 查询数据 + */ + List queryDataFromIotDb(IotDbParam iotDbParam) throws Exception; + + } diff --git a/src/main/java/com/muyu/iotdb/server/impl/IotDbServerImpl.java b/src/main/java/com/muyu/iotdb/server/impl/IotDbServerImpl.java index 633d05e..8776607 100644 --- a/src/main/java/com/muyu/iotdb/server/impl/IotDbServerImpl.java +++ b/src/main/java/com/muyu/iotdb/server/impl/IotDbServerImpl.java @@ -4,7 +4,8 @@ package com.muyu.iotdb.server.impl; import com.alibaba.fastjson2.JSONObject; import com.muyu.iotdb.config.IotDBSessionConfig; -import com.muyu.iotdb.model.VehicleData; +import com.muyu.iotdb.model.param.IotDbParam; +import com.muyu.iotdb.model.result.VehicleResult; import com.muyu.iotdb.server.IotDbServer; import lombok.extern.log4j.Log4j2; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -38,16 +39,85 @@ public class IotDbServerImpl implements IotDbServer { public void insertData(JSONObject vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException { // iotDbParam: 模拟设备上报消息 // bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码 - String deviceId = "root.test." + vehicleData.getString("vin"); + Object o = vehicleData.get("vinCode"); + String deviceId = "root.ln.bizkey." + o; // 将设备上报的数据存入数据库(时序数据库) List measurementsList = new ArrayList<>(); - measurementsList.add("vehicleStatusMsg"); List valuesList = new ArrayList<>(); - valuesList.add(vehicleData.toString()); - log.info("vehicleData:{}", vehicleData); + vehicleData.forEach( + (k, v) -> { + measurementsList.add(k); + valuesList.add(v.toString()); + } + ); + log.info("measurementsList:{}", measurementsList); log.info("valuesList:{}",valuesList); - iotDBSessionConfig.insertRecord(deviceId, vehicleData.getLong("timestamp"), measurementsList, valuesList); + iotDBSessionConfig.insertRecord(deviceId, vehicleData.getLong("timestamp1"), measurementsList, valuesList); + } + + @Override + public List queryDataFromIotDb(IotDbParam iotDbParam) throws Exception { + List vehicleResultList = new ArrayList<>(); + if (null != iotDbParam.getVin()) { + String sql = "select * from root.ln.bizkey."+ iotDbParam.getVin(); + if (null != iotDbParam.getStartTime() && !"".equals(iotDbParam.getStartTime())){ + if (sql.contains("where")){ + sql += " and time >= " + iotDbParam.getStartTime(); + }else{ + sql += " where time >= " + iotDbParam.getStartTime(); + } + } + if (null != iotDbParam.getEndTime() && !"".equals(iotDbParam.getEndTime())){ + if (sql.contains("where")){ + sql += " and time <= " + iotDbParam.getEndTime(); + }else{ + sql += " where time <= " + iotDbParam.getEndTime(); + } + } + SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql); + List columnNames = sessionDataSet.getColumnNames(); + List titleList = new ArrayList<>(); + // 排除Time字段 -- 方便后面后面拼装数据 + for (int i = 1; i < columnNames.size(); i++) { + String[] temp = columnNames.get(i).split("\\."); + titleList.add(temp[temp.length - 1]); + } + // 封装处理数据 + packagingData(iotDbParam, vehicleResultList, sessionDataSet, titleList); + } else { + log.info("PK或者SN不能为空!!"); + } + return vehicleResultList; } + /** + * 封装处理数据 + */ + private void packagingData(IotDbParam iotDbParam, List vehicleResultList, SessionDataSet sessionDataSet, List titleList) + throws StatementExecutionException, IoTDBConnectionException { + int fetchSize = sessionDataSet.getFetchSize(); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + VehicleResult vehicleResult = new VehicleResult(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + Map map = new HashMap<>(); + + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + // 这里的需要按照类型获取 + if (null != field.getDataType()){ + map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString()); + } + } + vehicleResult.setTime(timeString); + vehicleResult.setVin(iotDbParam.getVin()); + vehicleResult.setData(map.get(iotDbParam.getQueryAttribute())); + vehicleResultList.add(vehicleResult); + } + } + } + } diff --git a/src/main/java/com/muyu/mqtt/MessageCallbackService.java b/src/main/java/com/muyu/mqtt/MessageCallbackService.java index 4d1a7f4..d9cef6e 100644 --- a/src/main/java/com/muyu/mqtt/MessageCallbackService.java +++ b/src/main/java/com/muyu/mqtt/MessageCallbackService.java @@ -6,6 +6,8 @@ import com.alibaba.fastjson2.JSONObject; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.muyu.mqtt.dao.MessageDa; +import com.muyu.redis.demo.Sliding; +import com.muyu.springListen.SlidingWindow; import com.muyu.utils.ConversionUtil; import lombok.extern.log4j.Log4j2; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -17,11 +19,13 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.rmi.ServerException; import java.util.ArrayList; import java.util.List; @@ -65,57 +69,121 @@ public class MessageCallbackService implements MqttCallback { String s1 = ConversionUtil.hexStringToString(s); JSONObject object = getObject(s1); log.error("object:{}", object); - String vin =""; - String key = vin; // 使用vin作为key,如果适用 + String value = com.alibaba.fastjson.JSON.toJSONString(object); + JSONObject jsonObject = JSONObject.parseObject(value); + String vin = jsonObject.getString("vinCode"); + String key = vin; // 使用vin作为key,如果适用 + log.error("value:{}", value); ProducerRecord producerRecord = new ProducerRecord<>(topic, key, value); kafkaTemplate.send(producerRecord); + + String s2 = redisTemplate.opsForValue().get(key + "-length"); + log.error("s2:{}",JSON.toJSONString(s2)); + publishEvent(s2); } catch (Exception e) { e.printStackTrace(); } } + @Resource + ApplicationEventPublisher applicationEventPublisher; + public void publishEvent(String message) { + Sliding sliding = JSONObject.parseObject(message, Sliding.class); + log.error("slidingWindow:{}", sliding); + System.out.println("开始发布自定义事件"); + SlidingWindow customApplicationEvent = new SlidingWindow(this,sliding.getAnalyzeKey(),sliding.getVehicleVin(),sliding.getSlideLengthId(),sliding.getWindowRadiusId(),sliding.getMessageLabel(),sliding.getStandard()); + // 发布事件 + applicationEventPublisher.publishEvent(customApplicationEvent); + + System.out.println("发布自定义事件结束"); + } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } - public com.alibaba.fastjson2.JSONObject getObject(String message) { +// public com.alibaba.fastjson2.JSONObject getObject(String message) { +// log.info("原始数据:{}", message); +// // 移除字符串message的首尾字符,为后续处理准备 +// message = message.substring(1, message.length() - 2); +// // 初始化StringBuilder用于构建最终的JSON字符串 +// StringBuffer buffer = new StringBuffer(); +// +// // 从Redis中获取指定key的值,这里的key是message的前17个字符 +// Object o = redisTemplate.opsForHash().get("VIN", message.substring(0, 17)); +// log.error("o:{}", o); +// // 将获取到的值转换为VehicleDateModel的列表 +// List messageDas = new ArrayList<>(); +// messageDas = JSON.parseObject(o.toString(), ArrayList.class).stream() +// .map(obj -> JSON.parseObject(obj.toString(), MessageDa.class)).toList(); +// // 用于构建最终JSON字符串的变量 +// String finalMessage = message; +// // 遍历车辆数据模型列表,构建JSON字符串的键值对 +// messageDas.forEach(messageDa -> { +// // 根据车辆数据模型的起始和结束位置,以及特定的处理逻辑,构建JSON字符串的键值对并追加到buffer中 +// buffer.append(",\"" + messageDa.getAnalyzeKey() + "\":" + +// "\"" + remove0(finalMessage.substring(messageDa.getAnalyzeStart() , messageDa.getEnt())) + "\""); +// }); +// // 构建最终的JSON字符串 +// String jsonString = "{" + buffer.substring(1) + "}"; +// com.alibaba.fastjson2.JSONObject jsonObject = JSON.parseObject(jsonString); +// +// // 日志记录解析后的数据 +// log.info("解析后的数据:{}", jsonString); +// // 返回构建好的JSONObject +// return new com.alibaba.fastjson2.JSONObject(jsonObject); +// } + + + public JSONObject getObject(String message) { log.info("原始数据:{}", message); // 移除字符串message的首尾字符,为后续处理准备 message = message.substring(1, message.length() - 2); - // 初始化StringBuilder用于构建最终的JSON字符串 + + // 从本地缓存中获取数据 + String cacheKey = message.substring(0, 17); // 构造一个缓存键 + String cachedData = localCache.getIfPresent(cacheKey); + + if (cachedData == null) { + // 本地缓存中没有找到,从Redis中获取 + String dataFromRedis = redisTemplate.opsForHash().get("VIN", message.substring(0, 17)).toString(); + log.error("Data from Redis: {}", dataFromRedis); + if (dataFromRedis != null) { + // 将Redis中的数据放入本地缓存 + localCache.put(cacheKey, dataFromRedis); + log.info("数据放入本地缓存:{}", dataFromRedis); + cachedData = dataFromRedis; + } + } else { + log.info("数据在本地缓存中找到:{}", cachedData); + } + StringBuffer buffer = new StringBuffer(); - // 从Redis中获取指定key的值,这里的key是message的前17个字符 - Object o = redisTemplate.opsForHash().get("VIN", message.substring(0, 17)); - // 将获取到的值转换为VehicleDateModel的列表 List messageDas = new ArrayList<>(); - messageDas = JSON.parseObject(o.toString(), ArrayList.class).stream() - .map(obj -> JSON.parseObject(obj.toString(), MessageDa.class)).toList(); - // 用于构建最终JSON字符串的变量 - String finalMessage = message; - // 遍历车辆数据模型列表,构建JSON字符串的键值对 - messageDas.forEach(messageDa -> { - // 根据车辆数据模型的起始和结束位置,以及特定的处理逻辑,构建JSON字符串的键值对并追加到buffer中 - buffer.append(",\"" + messageDa.getAnalyzeKey() + "\":" + - "\"" + remove0(finalMessage.substring(messageDa.getAnalyzeStart() - 1, messageDa.getEnt())) + "\""); - }); + if (cachedData != null) { + // 将获取到的值转换为VehicleDateModel的列表 + messageDas = JSON.parseArray(cachedData, MessageDa.class); + // 用于构建最终JSON字符串的变量 + String finalMessage = message; + // 遍历车辆数据模型列表,构建JSON字符串的键值对 + messageDas.forEach(messageDa -> { + buffer.append(",\"" + messageDa.getAnalyzeKey() + "\":" + + "\"" + remove0(finalMessage.substring(messageDa.getAnalyzeStart(), messageDa.getEnt())) + "\""); + }); + } + // 构建最终的JSON字符串 String jsonString = "{" + buffer.substring(1) + "}"; - com.alibaba.fastjson2.JSONObject jsonObject = JSON.parseObject(jsonString); - try { + JSONObject jsonObject = JSON.parseObject(jsonString); - } catch (Exception e) { - throw new RuntimeException(e); - } // 日志记录解析后的数据 log.info("解析后的数据:{}", jsonString); // 返回构建好的JSONObject - return new com.alibaba.fastjson2.JSONObject(jsonObject); + return jsonObject; } - // 移除字符串中的前导零 public String remove0(String str) { if (str.length() > 1) { @@ -129,12 +197,18 @@ public class MessageCallbackService implements MqttCallback { } } - @RabbitListener(queues = "subscription") + @RabbitListener(queues ="renew") public void receiveSms(Message message) { // 拿到消息中的VIN - String vin = message.getBody().toString(); + byte[] body = message.getBody(); + String vin = new String(body); + log.error("接收到消息",body); + log.error("vin:{}",vin); +// redisTemplate.delete(vin); // 删除本地缓存 - localCache.invalidate(vin); +//// redisTemplate.delete(vin); + localCache.invalidate(vin); + log.error("删除本地缓存"+vin); } /** diff --git a/src/main/java/com/muyu/redis/demo/Sliding.java b/src/main/java/com/muyu/redis/demo/Sliding.java new file mode 100644 index 0000000..183239a --- /dev/null +++ b/src/main/java/com/muyu/redis/demo/Sliding.java @@ -0,0 +1,27 @@ +package com.muyu.redis.demo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * Sliding + * + * @author Yangle + * Date 2024/6/29 9:39 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class Sliding { + + private String analyzeKey; + private String vehicleVin; + private String slideLengthId; + private String windowRadiusId; + private String messageLabel; + private String standard; + +} diff --git a/src/main/java/com/muyu/redis/service/impl/RedisServiceImpl.java b/src/main/java/com/muyu/redis/service/impl/RedisServiceImpl.java index 29bda26..b5c4551 100644 --- a/src/main/java/com/muyu/redis/service/impl/RedisServiceImpl.java +++ b/src/main/java/com/muyu/redis/service/impl/RedisServiceImpl.java @@ -7,7 +7,9 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.muyu.mqtt.dao.MessageDa; import com.muyu.redis.demo.RedisData; +import com.muyu.redis.demo.Sliding; import com.muyu.redis.service.RedisService; +import com.muyu.springListen.SlidingWindow; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; @@ -30,49 +32,22 @@ public class RedisServiceImpl implements RedisService { @Override public void set(RedisData redisData) { +// Sliding sliding = new Sliding(); +// sliding.setVin("KVMVI1BD0Z3X5QKVJ"); +// sliding.setCar("20"); +// sliding.setIndex("20"); +// sliding.setLengthInterval("3/2"); +// sliding.setLengthTime(5); // 假设lengthTime是一个整数 - - List messageDas = new ArrayList<>(); - messageDas.add( - new MessageDa( - 1, - "vin", - 0, - 17, - "vIN", - "基础信息" - ) - ); - - messageDas.add( - new MessageDa( - 2, - "timestamp", - 17, - 30, - "时间戳", - "基础信息" - ) - ); - messageDas.add( - new MessageDa( - 3, - "longitude", - 38, - 41, - "经度", - "基础信息" - ) - ); - redisTemplate.opsForList().set(redisData.getVin(), 0, JSON.toJSONString(messageDas)); +// redisTemplate.opsForValue().set(redisData.getVin()+"-length", JSON.toJSONString(sliding)); } @Override public String get(RedisData redisData) { - String string = redisTemplate.opsForValue().get(redisData.getVin()); + String string = redisTemplate.opsForValue().get(redisData.getVin()+"-length"); log.error("string:{}", string); - return null; + return string; // String index = redisTemplate.opsForList().index(redisData.getVin(), 0); // log.error("index:{}",index); // // 解析JSON字符串 diff --git a/src/main/java/com/muyu/springListen/AsynCustomApplicationListener.java b/src/main/java/com/muyu/springListen/AsynCustomApplicationListener.java new file mode 100644 index 0000000..f3b06e9 --- /dev/null +++ b/src/main/java/com/muyu/springListen/AsynCustomApplicationListener.java @@ -0,0 +1,130 @@ +package com.muyu.springListen; + +import cn.hutool.core.date.DateTime; +import com.alibaba.fastjson2.JSON; +import com.muyu.event.common.VehicleEvent; +import com.muyu.event.service.EventService; +import com.muyu.iotdb.model.param.IotDbParam; +import com.muyu.iotdb.model.result.VehicleResult; +import com.muyu.iotdb.server.IotDbServer; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static java.lang.Thread.sleep; + +@Log4j2 +@Component +public class AsynCustomApplicationListener { + + @Autowired + private EventService eventService; + + @Autowired + private ScheduledExecutorService sharedExecutorService; // 使用共享的ScheduledExecutorService + + @Autowired + private IotDbServer iotDbServer; + @Autowired + private RabbitTemplate rabbitTemplate; + @Async + @EventListener(SlidingWindow.class) + public void asyncListener(SlidingWindow slidingWindow) { + log.info("异步线程监听到事件:{}", slidingWindow); + + List vehicleEvents = eventService.selectVehicleEvent(); + for (VehicleEvent vehicleEvent : vehicleEvents) { + if (vehicleEvent.getVin().equals(slidingWindow.vehicleVin())) { + vehicleEvent.setStates(true); + // 使用共享的ScheduledExecutorService调度任务 + Float lengthTime = Float.valueOf(slidingWindow.slideLengthId()); + int lengthInterval = Integer.parseInt(slidingWindow.windowRadiusId()); + //每隔多久查一次 reslut秒 + int result = (int) (lengthTime * lengthInterval); + sharedExecutorService.schedule(() -> { + log.error("走到这里啦"); + Date date = new Date(); + long time = date.getTime(); + time = time - (time % 1000); + long currentTimeMillis = System.currentTimeMillis(); + + // 将毫秒转换为秒 + long currentTimeInSeconds = currentTimeMillis / 1000; + //开始时间 + long l = currentTimeInSeconds - Long.valueOf(result); + //转成毫秒 + l = l * 1000; + Long s = Long.valueOf(slidingWindow.windowRadiusId()); + + //现在时间的毫秒 + long currentTimeInMillis = System.currentTimeMillis(); +// if (currentTimeInMillis-l==s){ +// try { +// sleep(s); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// }else { +// +// } + try { + List vehicleResults = iotDbServer.queryDataFromIotDb( + IotDbParam.builder() + .endTime(String.valueOf(currentTimeInMillis)) + .vin(vehicleEvent.getVin()) + .queryAttribute(slidingWindow.analyzeKey()) + .startTime( String.valueOf(l)) + .build() + ); + float sum=0; + for (VehicleResult vehicleResult : vehicleResults) { + String data = vehicleResult.getData(); + sum+=Float.valueOf(data); + } + sum=sum/vehicleResults.size(); + if (sum>Float.valueOf(slidingWindow.standard())){ + String string = new StringBuilder().append(slidingWindow.vehicleVin()).append("上升").append(new Date().getTime()).toString(); + rabbitTemplate.convertAndSend("SlidingWindow",string,message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + }); + log.error(slidingWindow.vehicleVin()+"上升"); + } + if (sum { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + }); + } + log.error("vehicleResults:{}",vehicleResults); + + +// if ( sumFloat.valueOf(slidingWindow.standard()) && sum-Float.valueOf(slidingWindow.standard()) >0 ){ +// log.error(slidingWindow.vehicleVin()+"波动"); +// } + + } catch (Exception e) { + throw new RuntimeException(e); + } +// iotDbServer.queryDataFromIotDb(new ) + }, Long.parseLong(slidingWindow.windowRadiusId()), TimeUnit.SECONDS); + } + } + } +} diff --git a/src/main/java/com/muyu/springListen/AsyncTaskExecutorConfig.java b/src/main/java/com/muyu/springListen/AsyncTaskExecutorConfig.java new file mode 100644 index 0000000..f8ed7ba --- /dev/null +++ b/src/main/java/com/muyu/springListen/AsyncTaskExecutorConfig.java @@ -0,0 +1,37 @@ +package com.muyu.springListen; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 线程池配置 AsyncTaskExecutorConfig + * + * @author Yangle + * Date 2024/6/25 19:59 + */ +@EnableAsync +@Configuration +public class AsyncTaskExecutorConfig { + + @Bean + public AsyncTaskExecutor taskExecutor(){ + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(20); + executor.setQueueCapacity(200); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("taskExecutor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + return executor; + } + + +} diff --git a/src/main/java/com/muyu/springListen/SlidingApplicationListener.java b/src/main/java/com/muyu/springListen/SlidingApplicationListener.java new file mode 100644 index 0000000..d9a4d47 --- /dev/null +++ b/src/main/java/com/muyu/springListen/SlidingApplicationListener.java @@ -0,0 +1,14 @@ +package com.muyu.springListen; + + +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +@Component +public class SlidingApplicationListener implements ApplicationListener { + + @Override + public void onApplicationEvent(SlidingWindow event) { + System.out.println("onApplicationEvent方法接收到的消息:{} "+event.messageLabel()+event.vehicleVin()+event.slideLengthId()+event.windowRadiusId()+event.analyzeKey()+event.getSource()+event.getTimestamp()); + } +} diff --git a/src/main/java/com/muyu/springListen/SlidingWindow.java b/src/main/java/com/muyu/springListen/SlidingWindow.java new file mode 100644 index 0000000..dfa65a2 --- /dev/null +++ b/src/main/java/com/muyu/springListen/SlidingWindow.java @@ -0,0 +1,55 @@ +package com.muyu.springListen; + +import org.checkerframework.checker.units.qual.A; +import org.springframework.context.ApplicationEvent; + +/** + * 滑窗 SlidingWindow + * + * @author Yangle + * Date 2024/6/28 22:33 + */ + +public class SlidingWindow extends ApplicationEvent { + + private String analyzeKey; + private String vehicleVin; + private String slideLengthId; + private String windowRadiusId; + private String messageLabel; + private String standard; + + + public SlidingWindow(Object source,String analyzeKey,String vehicleVin,String slideLengthId,String windowRadiusId,String messageLabel,String standard) { + super(source); + this.analyzeKey=analyzeKey; + this.vehicleVin=vehicleVin; + this.slideLengthId=slideLengthId; + this.windowRadiusId=windowRadiusId; + this.messageLabel=messageLabel; + this.standard=standard; + } + + public String messageLabel() { + return messageLabel; + } + public String standard() { + return standard; + } + public String analyzeKey() { + return analyzeKey; + } + + public String vehicleVin() { + return vehicleVin; + } + + public String slideLengthId() { + return slideLengthId; + } + + public String windowRadiusId() { + return windowRadiusId; + } +} + diff --git a/src/main/java/com/muyu/springListen/ThreadPoolConfig.java b/src/main/java/com/muyu/springListen/ThreadPoolConfig.java new file mode 100644 index 0000000..457e8e9 --- /dev/null +++ b/src/main/java/com/muyu/springListen/ThreadPoolConfig.java @@ -0,0 +1,16 @@ +package com.muyu.springListen; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@Configuration +public class ThreadPoolConfig { + + @Bean + public ScheduledExecutorService scheduledExecutorService() { + return Executors.newScheduledThreadPool(5); // 根据实际情况调整线程数 + } +} diff --git a/src/main/resources/mapper/EventMapper.xml b/src/main/resources/mapper/EventMapper.xml index bf94539..480f5a0 100644 --- a/src/main/resources/mapper/EventMapper.xml +++ b/src/main/resources/mapper/EventMapper.xml @@ -9,5 +9,8 @@ +