diff --git a/.idea/misc.xml b/.idea/misc.xml
index c60a15a..89ab430 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,3 @@
-
diff --git a/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/BugMalfunctionInfo.java b/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/BugMalfunctionInfo.java
new file mode 100644
index 0000000..09f3355
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/BugMalfunctionInfo.java
@@ -0,0 +1,23 @@
+package com.mobai.bugMalfuntion;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author Saisai
+ * @className BugMalfunctionInfo
+ * @description 描述
+ * @date 2024/6/20 16:55
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class BugMalfunctionInfo {
+
+ private String vinInfo; // vin + faultCode status
+
+ private Integer status;
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/BugMalfunctionResult.java b/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/BugMalfunctionResult.java
new file mode 100644
index 0000000..bdb94f5
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/BugMalfunctionResult.java
@@ -0,0 +1,22 @@
+package com.mobai.bugMalfuntion;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author Saisai
+ * @className BugMalfunctionResult
+ * @description 描述
+ * @date 2024/6/20 17:18
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class BugMalfunctionResult {
+ private String vin;
+ private String faultCode;
+ private Long timestamp;
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/FaultCode.java b/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/FaultCode.java
new file mode 100644
index 0000000..d83c163
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/bugMalfuntion/FaultCode.java
@@ -0,0 +1,42 @@
+package com.mobai.bugMalfuntion;
+
+/**
+ * @ClassName FaultCode
+ * @Description
+ * @Author Saisai
+ * @Date 2024/6/20 17:20
+ */
+public enum FaultCode {
+ VEHICLE("GZ001"),
+ CHARGING("GZ002"),
+ OPERATING("GZ003"),
+ SOC("GZ004"),
+ CHARGING_ENERGY("GZ005"),
+ DRIVE_MOTOR("GZ006"),
+ POSITION("GZ007"),
+ EAS("GZ008"),
+ PTC("GZ009"),
+ EPS("GZ010"),
+ ABS("GZ011"),
+ MCU("GZ012"),
+ HEATING("GZ013"),
+ BATTERY("GZ014"),
+ BATTERY_INSULATION("GZ015"),
+ DCDC("GZ016"),
+ CHG("GZ017");
+
+ private String string;
+
+ FaultCode() {
+ }
+
+ FaultCode(String code) {
+ this.string = code;
+ }
+
+ //获取string的值
+ public String getString() {
+ return string;
+ }
+
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/domain/Vehicle.java b/mobai-event-common/src/main/java/com/mobai/domain/Vehicle.java
index 758d5c4..7f0f891 100644
--- a/mobai-event-common/src/main/java/com/mobai/domain/Vehicle.java
+++ b/mobai-event-common/src/main/java/com/mobai/domain/Vehicle.java
@@ -153,11 +153,15 @@ public class Vehicle implements Serializable {
*/
private Integer operatingStatus;
/**
- * SOC
+ * SOC状态
+ */
+ private Integer socStatus;
+ /**
+ * 可充电储能装置工作状态
*/
private Integer chargingEnergyStorageStatus;
/**
- * 可充电储能装置工作状态
+ * 驱动电机状态
*/
private Integer driveMotorStatus;
/**
diff --git a/mobai-event-common/src/main/java/com/mobai/车辆参数 b/mobai-event-common/src/main/java/com/mobai/车辆参数
index a0c5855..d04e4e2 100644
--- a/mobai-event-common/src/main/java/com/mobai/车辆参数
+++ b/mobai-event-common/src/main/java/com/mobai/车辆参数
@@ -31,6 +31,7 @@ availableBatteryCapacity
vehicleStatus
chargingStatus
operatingStatus
+socStatus
chargingEnergyStorageStatus
driveMotorStatus
positionStatus
diff --git a/mobai-event-service/pom.xml b/mobai-event-service/pom.xml
index e399a87..a882e7f 100644
--- a/mobai-event-service/pom.xml
+++ b/mobai-event-service/pom.xml
@@ -95,6 +95,18 @@
8.0.33
+
+ org.springframework
+ spring-context-support
+
+
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 2.5.5
+
+
org.springframework.boot
spring-boot-starter-test
@@ -106,6 +118,26 @@
+
+ org.apache.struts
+ struts-core
+ 1.3.8
+
+
+ org.springframework.boot
+ spring-boot-test
+ 3.2.4
+
+
+ junit
+ junit
+ 4.13.2
+
+
+ org.springframework
+ spring-test
+ 6.1.5
+
diff --git a/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java b/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java
index 1cc451b..3b63082 100644
--- a/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java
+++ b/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java
@@ -2,6 +2,8 @@ package com.mobai;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cache.annotation.EnableCaching;
+import org.springframework.context.annotation.ComponentScan;
/**
* @author Mobai
@@ -9,9 +11,10 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @description 描述
* @date 2024/6/14 20:40
*/
+
@SpringBootApplication
public class EventServiceApplication {
public static void main(String[] args) {
- SpringApplication.run(EventServiceApplication.class);
+ SpringApplication.run(EventServiceApplication.class, args);
}
}
diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java
index 35a4239..13718fb 100644
--- a/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java
@@ -82,6 +82,8 @@ public class IotDBSessionConfig {
public void insertRecord(String deviceId, Long time,List measurementsList, List valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecord(deviceId, time, measurementsList, valuesList);
+ System.out.println(measurementsList);
+ System.out.println(valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
@@ -135,7 +137,7 @@ public class IotDBSessionConfig {
}
/**
- * description: 删除分组 如 root.a1eaKSRpRty
+ * description: 删除分组 root.vin.vin
* author: zhouhong
* @param groupName:分组名称
* @return
diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java b/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java
index a7998f1..d1cf6aa 100644
--- a/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java
@@ -53,9 +53,8 @@ public class IotDbController {
* @return
*/
@PostMapping("/api/device/deleteGroup")
- public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
- iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
- iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
+ public ResponseData deleteGroup(String vin) throws StatementExecutionException, IoTDBConnectionException {
+ iotDBSessionConfig.deleteStorageGroup("root.vin."+vin);
return ResponseData.success();
}
diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
index 25e8855..96b3b2d 100644
--- a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
@@ -72,6 +72,7 @@ public class IotDbServerImpl implements IotDbServer {
add("availableBatteryCapacity");
add("vehicleStatus");
add("chargingStatus");
+ add("socStatus");
add("operatingStatus");
add("chargingEnergyStorageStatus");
add("driveMotorStatus");
@@ -120,6 +121,7 @@ public class IotDbServerImpl implements IotDbServer {
add(String.valueOf(vehicle.getAvailableBatteryCapacity()));
add(String.valueOf(vehicle.getVehicleStatus()));
add(String.valueOf(vehicle.getChargingStatus()));
+ add(String.valueOf(vehicle.getSocStatus()));
add(String.valueOf(vehicle.getOperatingStatus()));
add(String.valueOf(vehicle.getChargingEnergyStorageStatus()));
add(String.valueOf(vehicle.getDriveMotorStatus()));
@@ -135,7 +137,7 @@ public class IotDbServerImpl implements IotDbServer {
add(String.valueOf(vehicle.getDcdcStatus()));
add(String.valueOf(vehicle.getChgStatus()));
}};
-
+ System.out.println(vehicle);
iotDBSessionConfig.insertRecord(deviceId, vehicle.getStartTime(), measurementsList, valuesList);
}
@@ -147,12 +149,12 @@ public class IotDbServerImpl implements IotDbServer {
if (null != req.getVin()) {
String sql = "select * from " + "root.vin." + req.getVin();
// 开始时间
- if (req.getStartTime() != null) {
+ if (req.getStartTime() != null && req.getStartTime() != 0) {
sql += " where startTime >= " + req.getStartTime();
}
// 结束时间
- if (req.getStartTime() != null) {
- sql += " and startTime < " + req.getEndTime();
+ if (req.getEndTime() != null && req.getEndTime() != 0) {
+ sql += " and startTime <= " + req.getEndTime();
}
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List columnNames = sessionDataSet.getColumnNames();
@@ -195,7 +197,8 @@ public class IotDbServerImpl implements IotDbServer {
Field field = fields.get(i);
// 这里的需要按照类型获取
- map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
+ Object obj = field.getObjectValue(field.getDataType());
+ map.put(titleList.get(i), obj==null? "null":obj.toString());
}
log.info(map);
// vehicle.setStartTime(Long.valueOf(map.get("startTime")));
diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java
index 661384e..fa5642f 100644
--- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java
+++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java
@@ -58,15 +58,13 @@ public class KafkaConsumerListenerExample {
*/
@KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics")
public void consume(ConsumerRecord record) {
- System.out.println(record);
+ log.info("消费信息为:{}",record);
Vehicle vehicle = getVehicle(record.value());
- // 存入iotDB数据库
+ // 存入iotDB
try {
iotDbServer.insertData(vehicle);
log.info("添加成功");
-// if (redisService.hasKey(vehicle.getVin())) {
-// redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
-// }
+
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (ServerException e) {
@@ -76,19 +74,6 @@ public class KafkaConsumerListenerExample {
}
}
-//// 批量消费
-// @KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
-// public void onBatchMessage(List> records) {
-// System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
-// List collect = records.stream().jsonObject(record -> {
-// Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
-// log.info("车辆报文:{}", value);
-// return value;
-// }).toList();
-// log.warn("批量消费的数量为:{},结果为:{}", records.size(), collect);
-// System.out.println(records);
-// }
-
@Bean
public void partitionConsumer() {
ArrayList topicPartitions = new ArrayList<>();
@@ -99,64 +84,6 @@ public class KafkaConsumerListenerExample {
topicPartitions.add(topicPartition);
}
}
-//
-// new ConsumerRebalanceListener (){
-// @Override
-// public void onPartitionsRevoked(Collection partitions) {
-// // 可以在这里处理分区被撤销前的逻辑
-// System.out.println("Partitions revoked: " + partitions);
-// }
-//
-// @Override
-// public void onPartitionsAssigned(Collection partitions) {
-// // 可以在这里处理分区被分配后的逻辑
-// System.out.println("Partitions assigned: " + partitions);
-// }
-// };
-// //1.参数配置:不是每一非得配置
-// Properties props = new Properties();
-// props.put("bootstrap.servers", "localhost:9092");
-// props.put("auto.commit.interval.ms", "1000");
-// //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
-// props.put("group.id", "test");
-// props.put("enable.auto.commit", "true");
-// props.put("session.timeout.ms", "30000");
-// props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-// props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-// KafkaConsumer consumer = new KafkaConsumer<>(props);
-// consumer.assign(topicPartitions);
-// try {
-//// while (true) {
-// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
-// if (!records.isEmpty()) { // 检查是否有消息到来
-// // 创建线程异步执行
-// new Thread(() -> {
-// for (TopicPartition partition : records.partitions()) {
-// List> partitionRecords = records.records(partition);
-// for (ConsumerRecord record : partitionRecords) {
-// // 处理每条消息
-// log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
-// Vehicle vehicle = getVehicle(record.value());
-// if (redisService.hasKey(vehicle.getVin())){
-// redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle));
-// }
-// }
-// }
-// }).start();
-// } else {
-// // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
-// Thread.sleep(10);
-// }
-//// }
-// } catch (InterruptedException e) {
-// // 处理解除阻塞时的中断异常,如Thread.sleep被中断
-// Thread.currentThread().interrupt(); // 重新设置中断标志
-// log.error("Consumer was interrupted.", e);
-// }
-// finally {
-// consumer.close();
-// }
-
}
// 自定义分区分配监听器
@@ -202,6 +129,7 @@ public class KafkaConsumerListenerExample {
setDcdcStatus(new BigDecimal(jsonObject.get("dcdcStatus").toString()).intValue());
setDriveMotorStatus(new BigDecimal(jsonObject.get("driveMotorStatus").toString()).intValue());
setPositionStatus(new BigDecimal(jsonObject.get("positionStatus").toString()).intValue());
+ setSocStatus(new BigDecimal(jsonObject.get("socStatus").toString()).intValue());
setPtcStatus(new BigDecimal(jsonObject.get("ptcStatus").toString()).intValue());
setEpsStatus(new BigDecimal(jsonObject.get("epsStatus").toString()).intValue());
setAbsStatus(new BigDecimal(jsonObject.get("absStatus").toString()).intValue());
@@ -232,11 +160,6 @@ public class KafkaConsumerListenerExample {
setAvailableBatteryCapacity(new BigDecimal(jsonObject.get("availableBatteryCapacity").toString()));
}};
-
-// if (redisService.hasKey(vehicle.getVin())) {
-// redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
-// }
- // 进行消息处理逻辑
log.info("车辆报文信息 : " + vehicle);
return vehicle;
}
diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java
deleted file mode 100644
index 3b11c93..0000000
--- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.mobai.kafka.listener;
-
-import com.alibaba.fastjson2.JSON;
-import com.mobai.domain.MqttServerModel;
-import com.mobai.domain.Vehicle;
-import com.mobai.forest.ForestGet;
-import com.mobai.utils.RedisService;
-import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.stereotype.Component;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * @author Saisai
- * @className VinConsumer
- * @description 描述
- * @date 2024/6/18 17:18
- */
-
-@Log4j2
-@Component
-public class VinConsumer implements ApplicationRunner {
-
- @Autowired
- private ForestGet forestGet;
-
- @Autowired
- private KafkaConsumerListenerExample kafkaConsumerListenerExample;
-
- @Autowired
- private RedisService redisService;
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- ArrayList topicPartitions = new ArrayList<>();
- List topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
- for (String topic : topics) {
- for (int i = 0; i < 8; i++) {
- TopicPartition topicPartition = new TopicPartition(topic, i);
- topicPartitions.add(topicPartition);
- }
- }
-
-// new ConsumerRebalanceListener(){
-// @Override
-// public void onPartitionsRevoked(Collection partitions) {
-// // 可以在这里处理分区被撤销前的逻辑
-// System.out.println("Partitions revoked: " + partitions);
-// }
-//
-// @Override
-// public void onPartitionsAssigned(Collection partitions) {
-// // 可以在这里处理分区被分配后的逻辑
-// System.out.println("Partitions assigned: " + partitions);
-// }
-// };
-// 1.参数配置:不是每一非得配置
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("auto.commit.interval.ms", "1000");
- //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.assign(topicPartitions);
- try {
- while (true) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
- if (!records.isEmpty()) { // 检查是否有消息到来
- // 创建线程异步执行
- new Thread(() -> {
- for (TopicPartition partition : records.partitions()) {
- List> partitionRecords = records.records(partition);
- for (ConsumerRecord record : partitionRecords) {
- // 处理每条消息
- log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
- Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value());
- if (redisService.hasKey(vehicle.getVin())){
- redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
- log.info("添加实时数据成功");
- }
- }
- }
- }).start();
- } else {
- // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
- Thread.sleep(10);
- }
- }
- } catch (InterruptedException e) {
- // 处理解除阻塞时的中断异常,如Thread.sleep被中断
- Thread.currentThread().interrupt(); // 重新设置中断标志
- log.error("Consumer was interrupted.", e);
- }
- finally {
- consumer.close();
- }
- }
-}
diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java
new file mode 100644
index 0000000..0b3028f
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumerRunner.java
@@ -0,0 +1,116 @@
+package com.mobai.kafka.listener;
+
+import com.alibaba.fastjson2.JSON;
+import com.mobai.domain.MqttServerModel;
+import com.mobai.domain.Vehicle;
+import com.mobai.domain.VehicleEvent;
+import com.mobai.forest.ForestGet;
+import com.mobai.utils.RedisService;
+import com.mobai.vehicle.event.service.EventsService;
+import com.mobai.vehicle.HandlerHelper;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Saisai
+ * @className VinConsumerRunner
+ * @description 描述
+ * @date 2024/6/18 17:18
+ */
+
+@Log4j2
+@Component
+public class VinConsumerRunner implements ApplicationRunner {
+
+ @Autowired
+ private ForestGet forestGet;
+
+ @Autowired
+ private KafkaConsumerListenerExample kafkaConsumerListenerExample;
+
+ @Autowired
+ private RedisService redisService;
+
+ @Autowired
+ private EventsService eventsService;
+
+
+ private final AtomicInteger start = new AtomicInteger();
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ if (start.get() != 0) {
+ return;
+ }
+ synchronized (this) {
+ if (start.get() != 0) {
+ return;
+ }
+ start.set(1);
+
+ // kafka分区监听器
+ new Thread(() -> {
+ ArrayList topicPartitions = new ArrayList<>();
+ List topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
+ for (String topic : topics) {
+ for (int i = 0; i < 8; i++) {
+ TopicPartition topicPartition = new TopicPartition(topic, i);
+ topicPartitions.add(topicPartition);
+ }
+ }
+ Properties props = new Properties(){{
+ put("bootstrap.servers", "localhost:9092");
+ put("auto.commit.interval.ms", "1000");
+ put("group.id", "test");
+ put("enable.auto.commit", "true");
+ put("session.timeout.ms", "30000");
+ put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ }};
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.assign(topicPartitions);
+ try {
+ while (true) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ if (!records.isEmpty()) {
+ new Thread(() -> {
+ for (TopicPartition partition : records.partitions()) {
+ List> partitionRecords = records.records(partition);
+ for (ConsumerRecord record : partitionRecords) {
+ log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
+ // 报文解析为对象
+// Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value());
+ Map map = JSON.parseObject(record.value(), Map.class);
+ // 获取对应的事件
+ VehicleEvent events = eventsService.getEvents(map.get("vin"));
+ HandlerHelper.doHandler(events, map, redisService);
+ }
+ }
+ }).start();
+ } else {
+ Thread.sleep(10);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Consumer was interrupted.", e);
+ } finally {
+ consumer.close();
+ }
+ }).start();
+ }
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/EventActive.java b/mobai-event-service/src/main/java/com/mobai/vehicle/EventActive.java
new file mode 100644
index 0000000..634a88a
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/EventActive.java
@@ -0,0 +1,18 @@
+package com.mobai.vehicle;
+
+import com.mobai.domain.EventReq;
+import com.mobai.domain.Vehicle;
+import com.mobai.utils.RedisService;
+
+import java.util.Map;
+
+/**
+ * 事件处理接口
+ * @ClassName EventActive
+ * @Description 描述
+ * @Author Saisai
+ * @Date 2024/6/20 14:33
+ */
+public interface EventActive {
+ void activeEvent(Map vehicle, RedisService redisService);
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/HandlerHelper.java b/mobai-event-service/src/main/java/com/mobai/vehicle/HandlerHelper.java
new file mode 100644
index 0000000..5d8921c
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/HandlerHelper.java
@@ -0,0 +1,29 @@
+package com.mobai.vehicle;
+
+import com.mobai.domain.Vehicle;
+import com.mobai.domain.VehicleEvent;
+import com.mobai.utils.RedisService;
+import com.mobai.vehicle.event.constants.EventHandler;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public final class HandlerHelper {
+
+ private HandlerHelper() {
+ }
+
+ /**
+ * 调用方法执行相应的事件
+ *
+ * @param events 事件类型
+ * @param vehicle 请求体 vin+事件状态
+ */
+ public static void doHandler(VehicleEvent events, Map vehicle, RedisService redisService) {
+ List list = Arrays.stream(events.getEvents().split("-")).toList();
+ list.forEach(type -> {
+ EventHandler.getEvent(type).activeEvent(vehicle, redisService);
+ });
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/cache/caffeine/CaffeineCache.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/cache/caffeine/CaffeineCache.java
new file mode 100644
index 0000000..039c95c
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/cache/caffeine/CaffeineCache.java
@@ -0,0 +1,60 @@
+package com.mobai.vehicle.event.cache.caffeine;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.Data;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Saisai
+ * @className CaffeineCache
+ * @description 描述
+ * @date 2024/6/21 16:15
+ */
+@Data
+@Configuration
+public class CaffeineCache {
+
+ /**
+ * 5秒钟缓存
+ * @return
+ */
+ @Bean(name = "createFiveCaffeine")
+ public Cache createFiveCaffeine() {
+ return Caffeine.newBuilder()
+ // 设置初始连接数
+ .initialCapacity(1000)
+ //5秒没有读写自动删除
+ .expireAfterAccess(5, TimeUnit.SECONDS)
+ //最大容量1024个,超过会自动清理空间
+ .maximumSize(1024)
+ .removalListener(((key, value, cause) -> {
+ //清理通知 key,value ==> 键值对 cause ==> 清理原因
+ }))
+ .build();
+ }
+
+ /**
+ * 半小时缓存
+ * @return cache缓存
+ */
+ @Bean(name = "createHalfCaffeine")
+ public Cache createHalfCaffeine() {
+ return Caffeine.newBuilder()
+ // 设置初始连接数
+ .initialCapacity(1000)
+ //半小时没有读自动删除 访问
+ .expireAfterAccess(30, TimeUnit.MINUTES)
+ //最大容量1024个,超过会自动清理空间
+ .maximumSize(1024)
+ .removalListener(((key, value, cause) -> {
+ //清理通知 key,value ==> 键值对 cause ==> 清理原因
+ }))
+ .build();
+ }
+
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/constants/EventHandler.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/constants/EventHandler.java
new file mode 100644
index 0000000..ff3ef9f
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/constants/EventHandler.java
@@ -0,0 +1,49 @@
+package com.mobai.vehicle.event.constants;
+
+import cn.hutool.extra.spring.SpringUtil;
+import com.mobai.vehicle.EventActive;
+import com.mobai.vehicle.event.factory.BugMalfunctionFactory;
+import com.mobai.vehicle.event.factory.ElectronicFenceFactory;
+import com.mobai.vehicle.event.factory.IndexWaringFactory;
+import com.mobai.vehicle.event.factory.RealTimeFactory;
+import lombok.extern.log4j.Log4j2;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+/**
+ * @ClassName EventHandler
+ * @Description
+ * @Author Saisai
+ * @Date 2024/6/20 15:08
+ */
+
+@Log4j2
+public enum EventHandler {
+
+
+ REAL_TIME_DATA(RealTimeFactory.class),
+ INDEX_WARNING(IndexWaringFactory.class),
+ BUG_MALFUNCTION(BugMalfunctionFactory.class),
+ ELECTRONIC_FENCE(ElectronicFenceFactory.class);
+
+ private String code;
+ private Class extends EventActive> eventActive;
+
+ EventHandler(Class extends EventActive> eventActive){
+ this.eventActive = eventActive;
+ }
+
+ public static EventActive getEvent(String code){
+
+ Optional eHandlerOptional = Arrays.stream(values())
+ .filter(eventActive -> eventActive.name().equals(code))
+ .findFirst();
+ if (eHandlerOptional.isEmpty()){
+ throw new RuntimeException("没有该事件");
+ }
+ EventHandler eventHandler = valueOf(code);
+ Class extends EventActive> eventActive1 = eventHandler.eventActive;
+ return SpringUtil.getBean(eventActive1);
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java
new file mode 100644
index 0000000..5ffd2a2
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/BugMalfunctionFactory.java
@@ -0,0 +1,177 @@
+package com.mobai.vehicle.event.factory;
+
+import com.alibaba.fastjson2.JSON;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.mobai.bugMalfuntion.BugMalfunctionResult;
+import com.mobai.bugMalfuntion.FaultCode;
+import com.mobai.domain.Vehicle;
+import com.mobai.utils.RedisService;
+import com.mobai.vehicle.EventActive;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 故障处理工厂
+ *
+ * @author Saisai
+ * @className BugMalfunctionFactory
+ * @description 描述
+ * @date 2024/6/20 14:42
+ */
+@Log4j2
+@Service
+public class BugMalfunctionFactory implements EventActive {
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+ @Autowired
+ private RedisService redisService;
+
+ @Autowired
+ private RedisTemplate redisTemplate;
+
+ @Resource(name = "createFiveCaffeine")
+ private Cache cacheFive;
+
+// @Autowired
+// private Cache getCacheFive(@Qualifier("createFiveCaffeine") Cache cacheFive){
+// if (cacheFive==null){
+// cacheFive = Caffeine.newBuilder()
+// // 设置初始连接数
+// .initialCapacity(1000)
+// //5秒没有读写自动删除
+// .expireAfterAccess(5, TimeUnit.SECONDS)
+// //最大容量1024个,超过会自动清理空间
+// .maximumSize(1024)
+// .removalListener(((key, value, cause) -> {
+// //清理通知 key,value ==> 键值对 cause ==> 清理原因
+// }))
+// .build();
+// }
+// return this.cacheFive = cacheFive;
+// }
+
+ private BugMalfunctionFactory() {
+ }
+
+ public static BugMalfunctionFactory getInstance() {
+ return new BugMalfunctionFactory();
+ }
+
+ @Override
+ public void activeEvent(Map vehicle, RedisService redisService) {
+ BugMalfunctionResult result = new BugMalfunctionResult();
+ result.setVin(vehicle.get("vin"));
+ result.setTimestamp(Long.valueOf(vehicle.get("startTime")));
+ if (vehicle.get("vehicleStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.VEHICLE.getString());
+ this.active(result);
+ }
+ if (vehicle.get("chargingStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.CHARGING.getString());
+ this.active(result);
+ }
+ if (vehicle.get("operatingStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.OPERATING.getString());
+ this.active(result);
+ }
+ if (vehicle.get("socStatus").equals(""+0)) {
+ result.setFaultCode(FaultCode.SOC.getString());
+ this.active(result);
+ }
+ if (vehicle.get("chargingEnergyStorageStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.CHARGING_ENERGY.getString());
+ this.active(result);
+ }
+ if (vehicle.get("driveMotorStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.DRIVE_MOTOR.getString());
+ this.active(result);
+ }
+ if (vehicle.get("positionStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.POSITION.getString());
+ this.active(result);
+ }
+ if (vehicle.get("EasStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.EAS.getString());
+ this.active(result);
+ }
+ if (vehicle.get("PtcStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.PTC.getString());
+ this.active(result);
+ }
+ if (vehicle.get("EpsStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.EPS.getString());
+ this.active(result);
+ }
+ if (vehicle.get("AbsStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.ABS.getString());
+ this.active(result);
+ }
+ if (vehicle.get("McuStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.MCU.getString());
+ this.active(result);
+ }
+ if (vehicle.get("HeatingStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.HEATING.getString());
+ this.active(result);
+ }
+ if (vehicle.get("BatteryStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.BATTERY.getString());
+ this.active(result);
+ }
+ if (vehicle.get("BatteryInsulationStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.BATTERY_INSULATION.getString());
+ this.active(result);
+ }
+ if (vehicle.get("DcdcStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.DCDC.getString());
+ this.active(result);
+ }
+ if (vehicle.get("ChgStatus").equals(0+"")) {
+ result.setFaultCode(FaultCode.CHG.getString());
+ this.active(result);
+ }
+
+ }
+
+
+ /**
+ * 发布异常开始事件
+ *
+ * @param result
+ */
+ public void sendBugMalfunction(BugMalfunctionResult result) {
+ log.info("发布异常事件");
+ rabbitTemplate.convertAndSend("zhiLian-vehicle-start", JSON.toJSONString(result));
+ }
+
+ /**
+ * 处理异常信息
+ */
+ public void active(BugMalfunctionResult result) {
+ // 5秒验证缓存
+ // 拼接异常信息
+ String vinInfo = result.getVin() + "-" + result.getFaultCode();
+ Object ifPresent = cacheFive.getIfPresent(vinInfo);
+ if (null == ifPresent) {
+ log.error("异常开始");
+ log.info(vinInfo);
+ redisTemplate.opsForValue().set(vinInfo, 0 + "", 10L, TimeUnit.SECONDS);
+ sendBugMalfunction(result);
+ }
+// else {
+ //存入缓存
+ cacheFive.put(vinInfo, 0 + "");
+ log.warn("异常中······· 原因:{}",result.getFaultCode());
+ redisTemplate.expire(vinInfo, 10, TimeUnit.SECONDS);
+// }
+
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/ElectronicFenceFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/ElectronicFenceFactory.java
new file mode 100644
index 0000000..60af17e
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/ElectronicFenceFactory.java
@@ -0,0 +1,29 @@
+package com.mobai.vehicle.event.factory;
+
+import com.mobai.domain.Vehicle;
+import com.mobai.utils.RedisService;
+import com.mobai.vehicle.EventActive;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * 电子围栏工厂
+ * @author Saisai
+ * @className BugMalfunctionFactory
+ * @description 描述
+ * @date 2024/6/20 14:42
+ */
+@Component
+public class ElectronicFenceFactory implements EventActive {
+ private ElectronicFenceFactory(){}
+
+ public static ElectronicFenceFactory getInstance(){
+ return new ElectronicFenceFactory();
+ }
+
+ @Override
+ public void activeEvent(Map vehicle, RedisService redisService) {
+
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/IndexWaringFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/IndexWaringFactory.java
new file mode 100644
index 0000000..813f44a
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/IndexWaringFactory.java
@@ -0,0 +1,38 @@
+package com.mobai.vehicle.event.factory;
+
+import com.mobai.domain.Vehicle;
+import com.mobai.utils.RedisService;
+import com.mobai.vehicle.EventActive;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * 实时数据工厂
+ * @author Saisai
+ * @className BugMalfunctionFactory
+ * @description 描述
+ * @date 2024/6/20 14:42
+ */
+
+@Log4j2
+@Component
+public class IndexWaringFactory implements EventActive {
+
+ @Autowired
+ private RedisService redisService;
+
+
+ private IndexWaringFactory(){}
+
+ public static IndexWaringFactory getInstance(){
+ return new IndexWaringFactory();
+ }
+
+ @Override
+ public void activeEvent(Map vehicle, RedisService redisService) {
+
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/RealTimeFactory.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/RealTimeFactory.java
new file mode 100644
index 0000000..b83ece9
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/factory/RealTimeFactory.java
@@ -0,0 +1,38 @@
+package com.mobai.vehicle.event.factory;
+
+import com.alibaba.fastjson2.JSON;
+import com.mobai.domain.Vehicle;
+import com.mobai.utils.RedisService;
+import com.mobai.vehicle.EventActive;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * 实时数据工厂
+ * @author Saisai
+ * @className BugMalfunctionFactory
+ * @description 描述
+ * @date 2024/6/20 14:42
+ */
+
+@Log4j2
+@Component
+public class RealTimeFactory implements EventActive {
+
+
+ private RealTimeFactory(){}
+
+ public static RealTimeFactory getInstance(){
+ return new RealTimeFactory();
+ }
+
+ @Override
+ public void activeEvent(Map vehicle, RedisService redisService) {
+ if (redisService.hasKey(vehicle.get("vin"))) {
+ redisService.setList(vehicle.get("vin"), JSON.toJSONString(vehicle));
+ log.info("添加实时数据成功");
+ }
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisKeyExpirationListener.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisKeyExpirationListener.java
new file mode 100644
index 0000000..d5b92b2
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisKeyExpirationListener.java
@@ -0,0 +1,39 @@
+package com.mobai.vehicle.event.redis;
+
+import com.alibaba.fastjson2.JSON;
+
+import com.mobai.bugMalfuntion.BugMalfunctionResult;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+@Log4j2
+@Component
+public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
+ public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
+ super(listenerContainer);
+ }
+
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+ //拿到过期key的信息并做处理
+ @Override
+ public void onMessage(Message message, byte[] pattern) {
+ String key = message.toString();
+ log.warn("过期的KEY是: {}" , key);
+ BugMalfunctionResult result = new BugMalfunctionResult();
+ String[] split = key.split("-");
+ result.setVin(split[0]);
+ result.setFaultCode(split[1]);
+ result.setTimestamp(new Date().getTime());
+ log.info(result);
+ rabbitTemplate.convertAndSend("zhiLian-vehicle-end", JSON.toJSONString(result));
+
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisListenerConfig.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisListenerConfig.java
new file mode 100644
index 0000000..50899ff
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/redis/RedisListenerConfig.java
@@ -0,0 +1,16 @@
+package com.mobai.vehicle.event.redis;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+
+@Configuration
+public class RedisListenerConfig {
+ @Bean
+ public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
+ RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+ container.setConnectionFactory(connectionFactory);
+ return container;
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventsService.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventsService.java
index a49150f..1095bbd 100644
--- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventsService.java
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/EventsService.java
@@ -1,6 +1,7 @@
package com.mobai.vehicle.event.service;
import com.baomidou.mybatisplus.extension.service.IService;
+import com.mobai.bugMalfuntion.BugMalfunctionInfo;
import com.mobai.domain.EventReq;
import com.mobai.domain.Result;
import com.mobai.domain.VehicleEvent;
@@ -13,5 +14,19 @@ import com.mobai.domain.VehicleEvent;
*/
public interface EventsService extends IService {
+ // 处理事件
Result activeEvent(EventReq req);
+
+ // 更新缓存中的事件
+// public VehicleEvent updEvents(VehicleEvent event);
+
+
+ // 查询缓存中的事件
+ public VehicleEvent getEvents(String vin);
+
+
+ // 查询缓存中的bug事件
+// public BugMalfunctionInfo getBug(BugMalfunctionInfo bugInfo);
+
+
}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventActiveServiceImpl.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventActiveServiceImpl.java
index b8a2085..6d2df9a 100644
--- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventActiveServiceImpl.java
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventActiveServiceImpl.java
@@ -12,6 +12,7 @@ import org.springframework.stereotype.Service;
import java.util.Properties;
/**
+ * 事件配置中心
* @author Saisai
* @className EventActiveServiceImpl
* @description 描述
@@ -27,7 +28,6 @@ public class EventActiveServiceImpl implements EventActiveService {
@Override
public Result realTimeData(EventReq req) {
-
if (req.getEventState()==0){
log.info("开始实时数据事件");
redisService.setCacheObject(req.getVin(),null);
@@ -35,6 +35,6 @@ public class EventActiveServiceImpl implements EventActiveService {
log.info("关闭实时数据事件");
redisService.deleteObject(req.getVin());
}
- return null;
+ return Result.success();
}
}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsActiveServiceImpl.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsActiveServiceImpl.java
new file mode 100644
index 0000000..0fabe44
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsActiveServiceImpl.java
@@ -0,0 +1,72 @@
+package com.mobai.vehicle.event.service.impl;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.mobai.domain.EventReq;
+import com.mobai.domain.Result;
+import com.mobai.domain.VehicleEvent;
+import com.mobai.vehicle.event.mapper.VehicleEventMapper;
+import com.mobai.vehicle.event.service.EventActiveService;
+import com.mobai.vehicle.event.service.EventsService;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+/**
+ * 事件处理
+ *
+ * @author Saisai
+ * @className EventsServiceImpl
+ * @description 描述
+ * @date 2024/6/18 11:39
+ */
+
+@Log4j2
+@Service
+public class EventsActiveServiceImpl extends ServiceImpl implements EventsService {
+
+ @Autowired
+ private EventActiveService eventActiveService;
+
+ @Resource(name = "createHalfCaffeine")
+ private Cache cacheHalf;
+
+ @Override
+ public Result activeEvent(EventReq req) {
+ switch (req.getEventType()) {
+ case 0:
+ return eventActiveService.realTimeData(req);
+ }
+ return null;
+ }
+
+
+ /**
+ * 通过车辆vin获取故障事件信息及缓存数据
+ *
+ * @param vin
+ * @return vin + events
+ */
+ public VehicleEvent getEvents(String vin) {
+ // 使用caffeine缓存
+ Object vehicleEvent = cacheHalf.getIfPresent(vin + "-events");
+ if (vehicleEvent != null) {
+ // 若有访问更新缓存时间
+ log.info("缓存");
+ cacheHalf.put(vin + "-events", vehicleEvent);
+ return (VehicleEvent) vehicleEvent;
+ }
+ vehicleEvent = this.getOne(new LambdaQueryWrapper<>() {{
+ eq(VehicleEvent::getVin, vin);
+ }});
+ cacheHalf.put(vin + "-events", vehicleEvent);
+ log.info("数据库");
+ Object ifPresent = cacheHalf.getIfPresent(vin + "-events");
+ log.info("缓存信息为:{}", ifPresent);
+ return (VehicleEvent) vehicleEvent;
+ }
+
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsServiceImpl.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsServiceImpl.java
deleted file mode 100644
index 9ac495f..0000000
--- a/mobai-event-service/src/main/java/com/mobai/vehicle/event/service/impl/EventsServiceImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.mobai.vehicle.event.service.impl;
-
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.mobai.domain.EventReq;
-import com.mobai.domain.Result;
-import com.mobai.domain.VehicleEvent;
-import com.mobai.vehicle.event.mapper.VehicleEventMapper;
-import com.mobai.vehicle.event.service.EventActiveService;
-import com.mobai.vehicle.event.service.EventsService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * @author Saisai
- * @className EventsServiceImpl
- * @description 描述
- * @date 2024/6/18 11:39
- */
-
-@Service
-public class EventsServiceImpl extends ServiceImpl implements EventsService {
-
- @Autowired
- private EventActiveService eventActiveService;
-
- @Override
- public Result activeEvent(EventReq req) {
- switch (req.getEventType()){
- case 0:
- return eventActiveService.realTimeData(req);
- }
- return null;
- }
-}
diff --git a/mobai-event-service/src/main/java/com/mobai/vehicle/event/standard/StandardEvent.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/standard/StandardEvent.java
new file mode 100644
index 0000000..1bb1bb0
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/vehicle/event/standard/StandardEvent.java
@@ -0,0 +1,30 @@
+package com.mobai.vehicle.event.standard;
+
+import com.rabbitmq.client.AMQP;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Saisai
+ * @className StandardEvent
+ * @description 描述
+ * @date 2024/6/26 16:32
+ */
+@Component
+public class StandardEvent {
+
+
+ @RabbitListener(queues = {"standard_Warn_Event_Start"})
+ public void eventStart(String msg, Message message, AMQP.Channel channel) {
+
+ }
+
+
+ @RabbitListener(queues = {"standard_Warn_Event_End"})
+ public void eventEnd(String msg, Message message, AMQP.Channel channel) {
+
+ }
+
+
+}
diff --git a/mobai-event-service/src/main/resources/application.yml b/mobai-event-service/src/main/resources/application.yml
index 65ecb22..acf54a8 100644
--- a/mobai-event-service/src/main/resources/application.yml
+++ b/mobai-event-service/src/main/resources/application.yml
@@ -112,12 +112,13 @@ spring:
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000
+
# mybatis配置
mybatis:
# 搜索指定包别名
- typeAliasesPackage: com.mobai
+ typeAliasesPackage: com.mobai.*.*.mapper
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/**/*.xml
logging:
level:
- com.mobai.mapper: DEBUG
+ com.mobai.*.*.mapper: DEBUG
diff --git a/mobai-event-service/src/main/resources/mapper/StayTimeMapper.xml b/mobai-event-service/src/main/resources/mapper/VehicleEventMapper.xml
similarity index 100%
rename from mobai-event-service/src/main/resources/mapper/StayTimeMapper.xml
rename to mobai-event-service/src/main/resources/mapper/VehicleEventMapper.xml