parent
a9793136da
commit
ca38a075b7
|
@ -1,4 +1,3 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ExternalStorageConfigurationManager" enabled="true" />
|
||||
<component name="MavenProjectsManager">
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package com.mobai.bugMalfuntion;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className BugMalfunctionInfo
|
||||
* @description 描述
|
||||
* @date 2024/6/20 16:55
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class BugMalfunctionInfo {
|
||||
|
||||
private String vinInfo; // vin + faultCode status
|
||||
|
||||
private Integer status;
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package com.mobai.bugMalfuntion;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className BugMalfunctionResult
|
||||
* @description 描述
|
||||
* @date 2024/6/20 17:18
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class BugMalfunctionResult {
|
||||
private String vin;
|
||||
private String faultCode;
|
||||
private Long timestamp;
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package com.mobai.bugMalfuntion;
|
||||
|
||||
/**
|
||||
* @ClassName FaultCode
|
||||
* @Description
|
||||
* @Author Saisai
|
||||
* @Date 2024/6/20 17:20
|
||||
*/
|
||||
public enum FaultCode {
|
||||
VEHICLE("GZ001"),
|
||||
CHARGING("GZ002"),
|
||||
OPERATING("GZ003"),
|
||||
SOC("GZ004"),
|
||||
CHARGING_ENERGY("GZ005"),
|
||||
DRIVE_MOTOR("GZ006"),
|
||||
POSITION("GZ007"),
|
||||
EAS("GZ008"),
|
||||
PTC("GZ009"),
|
||||
EPS("GZ010"),
|
||||
ABS("GZ011"),
|
||||
MCU("GZ012"),
|
||||
HEATING("GZ013"),
|
||||
BATTERY("GZ014"),
|
||||
BATTERY_INSULATION("GZ015"),
|
||||
DCDC("GZ016"),
|
||||
CHG("GZ017");
|
||||
|
||||
private String string;
|
||||
|
||||
FaultCode() {
|
||||
}
|
||||
|
||||
FaultCode(String code) {
|
||||
this.string = code;
|
||||
}
|
||||
|
||||
//获取string的值
|
||||
public String getString() {
|
||||
return string;
|
||||
}
|
||||
|
||||
}
|
|
@ -153,11 +153,15 @@ public class Vehicle implements Serializable {
|
|||
*/
|
||||
private Integer operatingStatus;
|
||||
/**
|
||||
* SOC
|
||||
* SOC状态
|
||||
*/
|
||||
private Integer socStatus;
|
||||
/**
|
||||
* 可充电储能装置工作状态
|
||||
*/
|
||||
private Integer chargingEnergyStorageStatus;
|
||||
/**
|
||||
* 可充电储能装置工作状态
|
||||
* 驱动电机状态
|
||||
*/
|
||||
private Integer driveMotorStatus;
|
||||
/**
|
||||
|
|
|
@ -31,6 +31,7 @@ availableBatteryCapacity
|
|||
vehicleStatus
|
||||
chargingStatus
|
||||
operatingStatus
|
||||
socStatus
|
||||
chargingEnergyStorageStatus
|
||||
driveMotorStatus
|
||||
positionStatus
|
||||
|
|
|
@ -95,6 +95,18 @@
|
|||
<version>8.0.33</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-context-support</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Caffeine 本地缓存 -->
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>2.5.5</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
|
@ -106,6 +118,26 @@
|
|||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.struts</groupId>
|
||||
<artifactId>struts-core</artifactId>
|
||||
<version>1.3.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-test</artifactId>
|
||||
<version>3.2.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.13.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-test</artifactId>
|
||||
<version>6.1.5</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -2,6 +2,8 @@ package com.mobai;
|
|||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
|
||||
/**
|
||||
* @author Mobai
|
||||
|
@ -9,9 +11,10 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
* @description 描述
|
||||
* @date 2024/6/14 20:40
|
||||
*/
|
||||
|
||||
@SpringBootApplication
|
||||
public class EventServiceApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(EventServiceApplication.class);
|
||||
SpringApplication.run(EventServiceApplication.class, args);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,8 @@ public class IotDBSessionConfig {
|
|||
public void insertRecord(String deviceId, Long time,List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
|
||||
if (measurementsList.size() == valuesList.size()) {
|
||||
session.insertRecord(deviceId, time, measurementsList, valuesList);
|
||||
System.out.println(measurementsList);
|
||||
System.out.println(valuesList);
|
||||
} else {
|
||||
log.error("measurementsList 与 valuesList 值不对应");
|
||||
}
|
||||
|
@ -135,7 +137,7 @@ public class IotDBSessionConfig {
|
|||
}
|
||||
|
||||
/**
|
||||
* description: 删除分组 如 root.a1eaKSRpRty
|
||||
* description: 删除分组 root.vin.vin
|
||||
* author: zhouhong
|
||||
* @param groupName:分组名称
|
||||
* @return
|
||||
|
|
|
@ -53,9 +53,8 @@ public class IotDbController {
|
|||
* @return
|
||||
*/
|
||||
@PostMapping("/api/device/deleteGroup")
|
||||
public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
|
||||
iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
|
||||
iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
|
||||
public ResponseData deleteGroup(String vin) throws StatementExecutionException, IoTDBConnectionException {
|
||||
iotDBSessionConfig.deleteStorageGroup("root.vin."+vin);
|
||||
return ResponseData.success();
|
||||
}
|
||||
|
||||
|
|
|
@ -72,6 +72,7 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
add("availableBatteryCapacity");
|
||||
add("vehicleStatus");
|
||||
add("chargingStatus");
|
||||
add("socStatus");
|
||||
add("operatingStatus");
|
||||
add("chargingEnergyStorageStatus");
|
||||
add("driveMotorStatus");
|
||||
|
@ -120,6 +121,7 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
add(String.valueOf(vehicle.getAvailableBatteryCapacity()));
|
||||
add(String.valueOf(vehicle.getVehicleStatus()));
|
||||
add(String.valueOf(vehicle.getChargingStatus()));
|
||||
add(String.valueOf(vehicle.getSocStatus()));
|
||||
add(String.valueOf(vehicle.getOperatingStatus()));
|
||||
add(String.valueOf(vehicle.getChargingEnergyStorageStatus()));
|
||||
add(String.valueOf(vehicle.getDriveMotorStatus()));
|
||||
|
@ -135,7 +137,7 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
add(String.valueOf(vehicle.getDcdcStatus()));
|
||||
add(String.valueOf(vehicle.getChgStatus()));
|
||||
}};
|
||||
|
||||
System.out.println(vehicle);
|
||||
iotDBSessionConfig.insertRecord(deviceId, vehicle.getStartTime(), measurementsList, valuesList);
|
||||
}
|
||||
|
||||
|
@ -147,12 +149,12 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
if (null != req.getVin()) {
|
||||
String sql = "select * from " + "root.vin." + req.getVin();
|
||||
// 开始时间
|
||||
if (req.getStartTime() != null) {
|
||||
if (req.getStartTime() != null && req.getStartTime() != 0) {
|
||||
sql += " where startTime >= " + req.getStartTime();
|
||||
}
|
||||
// 结束时间
|
||||
if (req.getStartTime() != null) {
|
||||
sql += " and startTime < " + req.getEndTime();
|
||||
if (req.getEndTime() != null && req.getEndTime() != 0) {
|
||||
sql += " and startTime <= " + req.getEndTime();
|
||||
}
|
||||
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
|
@ -195,7 +197,8 @@ public class IotDbServerImpl implements IotDbServer {
|
|||
|
||||
Field field = fields.get(i);
|
||||
// 这里的需要按照类型获取
|
||||
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
|
||||
Object obj = field.getObjectValue(field.getDataType());
|
||||
map.put(titleList.get(i), obj==null? "null":obj.toString());
|
||||
}
|
||||
log.info(map);
|
||||
// vehicle.setStartTime(Long.valueOf(map.get("startTime")));
|
||||
|
|
|
@ -58,15 +58,13 @@ public class KafkaConsumerListenerExample {
|
|||
*/
|
||||
@KafkaListener(topics = {"topic0", "topic1"}, groupId = "topics")
|
||||
public void consume(ConsumerRecord<String, String> record) {
|
||||
System.out.println(record);
|
||||
log.info("消费信息为:{}",record);
|
||||
Vehicle vehicle = getVehicle(record.value());
|
||||
// 存入iotDB数据库
|
||||
// 存入iotDB
|
||||
try {
|
||||
iotDbServer.insertData(vehicle);
|
||||
log.info("添加成功");
|
||||
// if (redisService.hasKey(vehicle.getVin())) {
|
||||
// redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
|
||||
// }
|
||||
|
||||
} catch (StatementExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (ServerException e) {
|
||||
|
@ -76,19 +74,6 @@ public class KafkaConsumerListenerExample {
|
|||
}
|
||||
}
|
||||
|
||||
//// 批量消费
|
||||
// @KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
|
||||
// public void onBatchMessage(List<ConsumerRecord<String, String>> records) {
|
||||
// System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
|
||||
// List<Vehicle> collect = records.stream().jsonObject(record -> {
|
||||
// Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
|
||||
// log.info("车辆报文:{}", value);
|
||||
// return value;
|
||||
// }).toList();
|
||||
// log.warn("批量消费的数量为:{},结果为:{}", records.size(), collect);
|
||||
// System.out.println(records);
|
||||
// }
|
||||
|
||||
@Bean
|
||||
public void partitionConsumer() {
|
||||
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
|
@ -99,64 +84,6 @@ public class KafkaConsumerListenerExample {
|
|||
topicPartitions.add(topicPartition);
|
||||
}
|
||||
}
|
||||
//
|
||||
// new ConsumerRebalanceListener (){
|
||||
// @Override
|
||||
// public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||
// // 可以在这里处理分区被撤销前的逻辑
|
||||
// System.out.println("Partitions revoked: " + partitions);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||
// // 可以在这里处理分区被分配后的逻辑
|
||||
// System.out.println("Partitions assigned: " + partitions);
|
||||
// }
|
||||
// };
|
||||
// //1.参数配置:不是每一非得配置
|
||||
// Properties props = new Properties();
|
||||
// props.put("bootstrap.servers", "localhost:9092");
|
||||
// props.put("auto.commit.interval.ms", "1000");
|
||||
// //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
|
||||
// props.put("group.id", "test");
|
||||
// props.put("enable.auto.commit", "true");
|
||||
// props.put("session.timeout.ms", "30000");
|
||||
// props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
// props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
// consumer.assign(topicPartitions);
|
||||
// try {
|
||||
//// while (true) {
|
||||
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
|
||||
// if (!records.isEmpty()) { // 检查是否有消息到来
|
||||
// // 创建线程异步执行
|
||||
// new Thread(() -> {
|
||||
// for (TopicPartition partition : records.partitions()) {
|
||||
// List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
|
||||
// for (ConsumerRecord<String, String> record : partitionRecords) {
|
||||
// // 处理每条消息
|
||||
// log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
|
||||
// Vehicle vehicle = getVehicle(record.value());
|
||||
// if (redisService.hasKey(vehicle.getVin())){
|
||||
// redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }).start();
|
||||
// } else {
|
||||
// // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
|
||||
// Thread.sleep(10);
|
||||
// }
|
||||
//// }
|
||||
// } catch (InterruptedException e) {
|
||||
// // 处理解除阻塞时的中断异常,如Thread.sleep被中断
|
||||
// Thread.currentThread().interrupt(); // 重新设置中断标志
|
||||
// log.error("Consumer was interrupted.", e);
|
||||
// }
|
||||
// finally {
|
||||
// consumer.close();
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
// 自定义分区分配监听器
|
||||
|
@ -202,6 +129,7 @@ public class KafkaConsumerListenerExample {
|
|||
setDcdcStatus(new BigDecimal(jsonObject.get("dcdcStatus").toString()).intValue());
|
||||
setDriveMotorStatus(new BigDecimal(jsonObject.get("driveMotorStatus").toString()).intValue());
|
||||
setPositionStatus(new BigDecimal(jsonObject.get("positionStatus").toString()).intValue());
|
||||
setSocStatus(new BigDecimal(jsonObject.get("socStatus").toString()).intValue());
|
||||
setPtcStatus(new BigDecimal(jsonObject.get("ptcStatus").toString()).intValue());
|
||||
setEpsStatus(new BigDecimal(jsonObject.get("epsStatus").toString()).intValue());
|
||||
setAbsStatus(new BigDecimal(jsonObject.get("absStatus").toString()).intValue());
|
||||
|
@ -232,11 +160,6 @@ public class KafkaConsumerListenerExample {
|
|||
setAvailableBatteryCapacity(new BigDecimal(jsonObject.get("availableBatteryCapacity").toString()));
|
||||
|
||||
}};
|
||||
|
||||
// if (redisService.hasKey(vehicle.getVin())) {
|
||||
// redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
|
||||
// }
|
||||
// 进行消息处理逻辑
|
||||
log.info("车辆报文信息 : " + vehicle);
|
||||
return vehicle;
|
||||
}
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
package com.mobai.kafka.listener;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.mobai.domain.MqttServerModel;
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.forest.ForestGet;
|
||||
import com.mobai.utils.RedisService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className VinConsumer
|
||||
* @description 描述
|
||||
* @date 2024/6/18 17:18
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
public class VinConsumer implements ApplicationRunner {
|
||||
|
||||
@Autowired
|
||||
private ForestGet forestGet;
|
||||
|
||||
@Autowired
|
||||
private KafkaConsumerListenerExample kafkaConsumerListenerExample;
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
List<String> topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
|
||||
for (String topic : topics) {
|
||||
for (int i = 0; i < 8; i++) {
|
||||
TopicPartition topicPartition = new TopicPartition(topic, i);
|
||||
topicPartitions.add(topicPartition);
|
||||
}
|
||||
}
|
||||
|
||||
// new ConsumerRebalanceListener(){
|
||||
// @Override
|
||||
// public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||
// // 可以在这里处理分区被撤销前的逻辑
|
||||
// System.out.println("Partitions revoked: " + partitions);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||
// // 可以在这里处理分区被分配后的逻辑
|
||||
// System.out.println("Partitions assigned: " + partitions);
|
||||
// }
|
||||
// };
|
||||
// 1.参数配置:不是每一非得配置
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("auto.commit.interval.ms", "1000");
|
||||
//因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
|
||||
props.put("group.id", "test");
|
||||
props.put("enable.auto.commit", "true");
|
||||
props.put("session.timeout.ms", "30000");
|
||||
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
consumer.assign(topicPartitions);
|
||||
try {
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
|
||||
if (!records.isEmpty()) { // 检查是否有消息到来
|
||||
// 创建线程异步执行
|
||||
new Thread(() -> {
|
||||
for (TopicPartition partition : records.partitions()) {
|
||||
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
|
||||
for (ConsumerRecord<String, String> record : partitionRecords) {
|
||||
// 处理每条消息
|
||||
log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
|
||||
Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value());
|
||||
if (redisService.hasKey(vehicle.getVin())){
|
||||
redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
|
||||
log.info("添加实时数据成功");
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
} else {
|
||||
// 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑
|
||||
Thread.sleep(10);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// 处理解除阻塞时的中断异常,如Thread.sleep被中断
|
||||
Thread.currentThread().interrupt(); // 重新设置中断标志
|
||||
log.error("Consumer was interrupted.", e);
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
package com.mobai.kafka.listener;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.mobai.domain.MqttServerModel;
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
import com.mobai.forest.ForestGet;
|
||||
import com.mobai.utils.RedisService;
|
||||
import com.mobai.vehicle.event.service.EventsService;
|
||||
import com.mobai.vehicle.HandlerHelper;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className VinConsumerRunner
|
||||
* @description 描述
|
||||
* @date 2024/6/18 17:18
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
public class VinConsumerRunner implements ApplicationRunner {
|
||||
|
||||
@Autowired
|
||||
private ForestGet forestGet;
|
||||
|
||||
@Autowired
|
||||
private KafkaConsumerListenerExample kafkaConsumerListenerExample;
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
private EventsService eventsService;
|
||||
|
||||
|
||||
private final AtomicInteger start = new AtomicInteger();
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
if (start.get() != 0) {
|
||||
return;
|
||||
}
|
||||
synchronized (this) {
|
||||
if (start.get() != 0) {
|
||||
return;
|
||||
}
|
||||
start.set(1);
|
||||
|
||||
// kafka分区监听器
|
||||
new Thread(() -> {
|
||||
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
List<String> topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
|
||||
for (String topic : topics) {
|
||||
for (int i = 0; i < 8; i++) {
|
||||
TopicPartition topicPartition = new TopicPartition(topic, i);
|
||||
topicPartitions.add(topicPartition);
|
||||
}
|
||||
}
|
||||
Properties props = new Properties(){{
|
||||
put("bootstrap.servers", "localhost:9092");
|
||||
put("auto.commit.interval.ms", "1000");
|
||||
put("group.id", "test");
|
||||
put("enable.auto.commit", "true");
|
||||
put("session.timeout.ms", "30000");
|
||||
put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
}};
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
consumer.assign(topicPartitions);
|
||||
try {
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||
if (!records.isEmpty()) {
|
||||
new Thread(() -> {
|
||||
for (TopicPartition partition : records.partitions()) {
|
||||
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
|
||||
for (ConsumerRecord<String, String> record : partitionRecords) {
|
||||
log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
|
||||
// 报文解析为对象
|
||||
// Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value());
|
||||
Map<String,String> map = JSON.parseObject(record.value(), Map.class);
|
||||
// 获取对应的事件
|
||||
VehicleEvent events = eventsService.getEvents(map.get("vin"));
|
||||
HandlerHelper.doHandler(events, map, redisService);
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
} else {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("Consumer was interrupted.", e);
|
||||
} finally {
|
||||
consumer.close();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package com.mobai.vehicle;
|
||||
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.utils.RedisService;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 事件处理接口
|
||||
* @ClassName EventActive
|
||||
* @Description 描述
|
||||
* @Author Saisai
|
||||
* @Date 2024/6/20 14:33
|
||||
*/
|
||||
public interface EventActive {
|
||||
void activeEvent(Map<String,String> vehicle, RedisService redisService);
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.mobai.vehicle;
|
||||
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
import com.mobai.utils.RedisService;
|
||||
import com.mobai.vehicle.event.constants.EventHandler;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public final class HandlerHelper {
|
||||
|
||||
private HandlerHelper() {
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用方法执行相应的事件
|
||||
*
|
||||
* @param events 事件类型
|
||||
* @param vehicle 请求体 vin+事件状态
|
||||
*/
|
||||
public static void doHandler(VehicleEvent events, Map<String,String> vehicle, RedisService redisService) {
|
||||
List<String> list = Arrays.stream(events.getEvents().split("-")).toList();
|
||||
list.forEach(type -> {
|
||||
EventHandler.getEvent(type).activeEvent(vehicle, redisService);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
package com.mobai.vehicle.event.cache.caffeine;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import lombok.Data;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className CaffeineCache
|
||||
* @description 描述
|
||||
* @date 2024/6/21 16:15
|
||||
*/
|
||||
@Data
|
||||
@Configuration
|
||||
public class CaffeineCache {
|
||||
|
||||
/**
|
||||
* 5秒钟缓存
|
||||
* @return
|
||||
*/
|
||||
@Bean(name = "createFiveCaffeine")
|
||||
public Cache createFiveCaffeine() {
|
||||
return Caffeine.newBuilder()
|
||||
// 设置初始连接数
|
||||
.initialCapacity(1000)
|
||||
//5秒没有读写自动删除
|
||||
.expireAfterAccess(5, TimeUnit.SECONDS)
|
||||
//最大容量1024个,超过会自动清理空间
|
||||
.maximumSize(1024)
|
||||
.removalListener(((key, value, cause) -> {
|
||||
//清理通知 key,value ==> 键值对 cause ==> 清理原因
|
||||
}))
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 半小时缓存
|
||||
* @return cache缓存
|
||||
*/
|
||||
@Bean(name = "createHalfCaffeine")
|
||||
public Cache createHalfCaffeine() {
|
||||
return Caffeine.newBuilder()
|
||||
// 设置初始连接数
|
||||
.initialCapacity(1000)
|
||||
//半小时没有读自动删除 访问
|
||||
.expireAfterAccess(30, TimeUnit.MINUTES)
|
||||
//最大容量1024个,超过会自动清理空间
|
||||
.maximumSize(1024)
|
||||
.removalListener(((key, value, cause) -> {
|
||||
//清理通知 key,value ==> 键值对 cause ==> 清理原因
|
||||
}))
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package com.mobai.vehicle.event.constants;
|
||||
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.mobai.vehicle.EventActive;
|
||||
import com.mobai.vehicle.event.factory.BugMalfunctionFactory;
|
||||
import com.mobai.vehicle.event.factory.ElectronicFenceFactory;
|
||||
import com.mobai.vehicle.event.factory.IndexWaringFactory;
|
||||
import com.mobai.vehicle.event.factory.RealTimeFactory;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* @ClassName EventHandler
|
||||
* @Description
|
||||
* @Author Saisai
|
||||
* @Date 2024/6/20 15:08
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
public enum EventHandler {
|
||||
|
||||
|
||||
REAL_TIME_DATA(RealTimeFactory.class),
|
||||
INDEX_WARNING(IndexWaringFactory.class),
|
||||
BUG_MALFUNCTION(BugMalfunctionFactory.class),
|
||||
ELECTRONIC_FENCE(ElectronicFenceFactory.class);
|
||||
|
||||
private String code;
|
||||
private Class<? extends EventActive> eventActive;
|
||||
|
||||
EventHandler(Class<? extends EventActive> eventActive){
|
||||
this.eventActive = eventActive;
|
||||
}
|
||||
|
||||
public static EventActive getEvent(String code){
|
||||
|
||||
Optional<EventHandler> eHandlerOptional = Arrays.stream(values())
|
||||
.filter(eventActive -> eventActive.name().equals(code))
|
||||
.findFirst();
|
||||
if (eHandlerOptional.isEmpty()){
|
||||
throw new RuntimeException("没有该事件");
|
||||
}
|
||||
EventHandler eventHandler = valueOf(code);
|
||||
Class<? extends EventActive> eventActive1 = eventHandler.eventActive;
|
||||
return SpringUtil.getBean(eventActive1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,177 @@
|
|||
package com.mobai.vehicle.event.factory;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.mobai.bugMalfuntion.BugMalfunctionResult;
|
||||
import com.mobai.bugMalfuntion.FaultCode;
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.utils.RedisService;
|
||||
import com.mobai.vehicle.EventActive;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 故障处理工厂
|
||||
*
|
||||
* @author Saisai
|
||||
* @className BugMalfunctionFactory
|
||||
* @description 描述
|
||||
* @date 2024/6/20 14:42
|
||||
*/
|
||||
@Log4j2
|
||||
@Service
|
||||
public class BugMalfunctionFactory implements EventActive {
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String,String> redisTemplate;
|
||||
|
||||
@Resource(name = "createFiveCaffeine")
|
||||
private Cache cacheFive;
|
||||
|
||||
// @Autowired
|
||||
// private Cache getCacheFive(@Qualifier("createFiveCaffeine") Cache cacheFive){
|
||||
// if (cacheFive==null){
|
||||
// cacheFive = Caffeine.newBuilder()
|
||||
// // 设置初始连接数
|
||||
// .initialCapacity(1000)
|
||||
// //5秒没有读写自动删除
|
||||
// .expireAfterAccess(5, TimeUnit.SECONDS)
|
||||
// //最大容量1024个,超过会自动清理空间
|
||||
// .maximumSize(1024)
|
||||
// .removalListener(((key, value, cause) -> {
|
||||
// //清理通知 key,value ==> 键值对 cause ==> 清理原因
|
||||
// }))
|
||||
// .build();
|
||||
// }
|
||||
// return this.cacheFive = cacheFive;
|
||||
// }
|
||||
|
||||
private BugMalfunctionFactory() {
|
||||
}
|
||||
|
||||
public static BugMalfunctionFactory getInstance() {
|
||||
return new BugMalfunctionFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activeEvent(Map<String,String> vehicle, RedisService redisService) {
|
||||
BugMalfunctionResult result = new BugMalfunctionResult();
|
||||
result.setVin(vehicle.get("vin"));
|
||||
result.setTimestamp(Long.valueOf(vehicle.get("startTime")));
|
||||
if (vehicle.get("vehicleStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.VEHICLE.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("chargingStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.CHARGING.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("operatingStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.OPERATING.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("socStatus").equals(""+0)) {
|
||||
result.setFaultCode(FaultCode.SOC.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("chargingEnergyStorageStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.CHARGING_ENERGY.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("driveMotorStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.DRIVE_MOTOR.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("positionStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.POSITION.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("EasStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.EAS.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("PtcStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.PTC.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("EpsStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.EPS.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("AbsStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.ABS.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("McuStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.MCU.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("HeatingStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.HEATING.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("BatteryStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.BATTERY.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("BatteryInsulationStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.BATTERY_INSULATION.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("DcdcStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.DCDC.getString());
|
||||
this.active(result);
|
||||
}
|
||||
if (vehicle.get("ChgStatus").equals(0+"")) {
|
||||
result.setFaultCode(FaultCode.CHG.getString());
|
||||
this.active(result);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 发布异常开始事件
|
||||
*
|
||||
* @param result
|
||||
*/
|
||||
public void sendBugMalfunction(BugMalfunctionResult result) {
|
||||
log.info("发布异常事件");
|
||||
rabbitTemplate.convertAndSend("zhiLian-vehicle-start", JSON.toJSONString(result));
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理异常信息
|
||||
*/
|
||||
public void active(BugMalfunctionResult result) {
|
||||
// 5秒验证缓存
|
||||
// 拼接异常信息
|
||||
String vinInfo = result.getVin() + "-" + result.getFaultCode();
|
||||
Object ifPresent = cacheFive.getIfPresent(vinInfo);
|
||||
if (null == ifPresent) {
|
||||
log.error("异常开始");
|
||||
log.info(vinInfo);
|
||||
redisTemplate.opsForValue().set(vinInfo, 0 + "", 10L, TimeUnit.SECONDS);
|
||||
sendBugMalfunction(result);
|
||||
}
|
||||
// else {
|
||||
//存入缓存
|
||||
cacheFive.put(vinInfo, 0 + "");
|
||||
log.warn("异常中······· 原因:{}",result.getFaultCode());
|
||||
redisTemplate.expire(vinInfo, 10, TimeUnit.SECONDS);
|
||||
// }
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.mobai.vehicle.event.factory;
|
||||
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.utils.RedisService;
|
||||
import com.mobai.vehicle.EventActive;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 电子围栏工厂
|
||||
* @author Saisai
|
||||
* @className BugMalfunctionFactory
|
||||
* @description 描述
|
||||
* @date 2024/6/20 14:42
|
||||
*/
|
||||
@Component
|
||||
public class ElectronicFenceFactory implements EventActive {
|
||||
private ElectronicFenceFactory(){}
|
||||
|
||||
public static ElectronicFenceFactory getInstance(){
|
||||
return new ElectronicFenceFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activeEvent(Map<String,String> vehicle, RedisService redisService) {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package com.mobai.vehicle.event.factory;
|
||||
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.utils.RedisService;
|
||||
import com.mobai.vehicle.EventActive;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 实时数据工厂
|
||||
* @author Saisai
|
||||
* @className BugMalfunctionFactory
|
||||
* @description 描述
|
||||
* @date 2024/6/20 14:42
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
public class IndexWaringFactory implements EventActive {
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
|
||||
private IndexWaringFactory(){}
|
||||
|
||||
public static IndexWaringFactory getInstance(){
|
||||
return new IndexWaringFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activeEvent(Map<String,String> vehicle, RedisService redisService) {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package com.mobai.vehicle.event.factory;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.mobai.domain.Vehicle;
|
||||
import com.mobai.utils.RedisService;
|
||||
import com.mobai.vehicle.EventActive;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 实时数据工厂
|
||||
* @author Saisai
|
||||
* @className BugMalfunctionFactory
|
||||
* @description 描述
|
||||
* @date 2024/6/20 14:42
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
public class RealTimeFactory implements EventActive {
|
||||
|
||||
|
||||
private RealTimeFactory(){}
|
||||
|
||||
public static RealTimeFactory getInstance(){
|
||||
return new RealTimeFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activeEvent(Map<String,String> vehicle, RedisService redisService) {
|
||||
if (redisService.hasKey(vehicle.get("vin"))) {
|
||||
redisService.setList(vehicle.get("vin"), JSON.toJSONString(vehicle));
|
||||
log.info("添加实时数据成功");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package com.mobai.vehicle.event.redis;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
|
||||
import com.mobai.bugMalfuntion.BugMalfunctionResult;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
@Log4j2
|
||||
@Component
|
||||
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
|
||||
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
|
||||
super(listenerContainer);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
//拿到过期key的信息并做处理
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
String key = message.toString();
|
||||
log.warn("过期的KEY是: {}" , key);
|
||||
BugMalfunctionResult result = new BugMalfunctionResult();
|
||||
String[] split = key.split("-");
|
||||
result.setVin(split[0]);
|
||||
result.setFaultCode(split[1]);
|
||||
result.setTimestamp(new Date().getTime());
|
||||
log.info(result);
|
||||
rabbitTemplate.convertAndSend("zhiLian-vehicle-end", JSON.toJSONString(result));
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.mobai.vehicle.event.redis;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
|
||||
@Configuration
|
||||
public class RedisListenerConfig {
|
||||
@Bean
|
||||
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
container.setConnectionFactory(connectionFactory);
|
||||
return container;
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package com.mobai.vehicle.event.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.mobai.bugMalfuntion.BugMalfunctionInfo;
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
|
@ -13,5 +14,19 @@ import com.mobai.domain.VehicleEvent;
|
|||
*/
|
||||
public interface EventsService extends IService<VehicleEvent> {
|
||||
|
||||
// 处理事件
|
||||
Result activeEvent(EventReq req);
|
||||
|
||||
// 更新缓存中的事件
|
||||
// public VehicleEvent updEvents(VehicleEvent event);
|
||||
|
||||
|
||||
// 查询缓存中的事件
|
||||
public VehicleEvent getEvents(String vin);
|
||||
|
||||
|
||||
// 查询缓存中的bug事件
|
||||
// public BugMalfunctionInfo getBug(BugMalfunctionInfo bugInfo);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.springframework.stereotype.Service;
|
|||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 事件配置中心
|
||||
* @author Saisai
|
||||
* @className EventActiveServiceImpl
|
||||
* @description 描述
|
||||
|
@ -27,7 +28,6 @@ public class EventActiveServiceImpl implements EventActiveService {
|
|||
|
||||
@Override
|
||||
public Result realTimeData(EventReq req) {
|
||||
|
||||
if (req.getEventState()==0){
|
||||
log.info("开始实时数据事件");
|
||||
redisService.setCacheObject(req.getVin(),null);
|
||||
|
@ -35,6 +35,6 @@ public class EventActiveServiceImpl implements EventActiveService {
|
|||
log.info("关闭实时数据事件");
|
||||
redisService.deleteObject(req.getVin());
|
||||
}
|
||||
return null;
|
||||
return Result.success();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
package com.mobai.vehicle.event.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
import com.mobai.vehicle.event.mapper.VehicleEventMapper;
|
||||
import com.mobai.vehicle.event.service.EventActiveService;
|
||||
import com.mobai.vehicle.event.service.EventsService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 事件处理
|
||||
*
|
||||
* @author Saisai
|
||||
* @className EventsServiceImpl
|
||||
* @description 描述
|
||||
* @date 2024/6/18 11:39
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@Service
|
||||
public class EventsActiveServiceImpl extends ServiceImpl<VehicleEventMapper, VehicleEvent> implements EventsService {
|
||||
|
||||
@Autowired
|
||||
private EventActiveService eventActiveService;
|
||||
|
||||
@Resource(name = "createHalfCaffeine")
|
||||
private Cache cacheHalf;
|
||||
|
||||
@Override
|
||||
public Result activeEvent(EventReq req) {
|
||||
switch (req.getEventType()) {
|
||||
case 0:
|
||||
return eventActiveService.realTimeData(req);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 通过车辆vin获取故障事件信息及缓存数据
|
||||
*
|
||||
* @param vin
|
||||
* @return vin + events
|
||||
*/
|
||||
public VehicleEvent getEvents(String vin) {
|
||||
// 使用caffeine缓存
|
||||
Object vehicleEvent = cacheHalf.getIfPresent(vin + "-events");
|
||||
if (vehicleEvent != null) {
|
||||
// 若有访问更新缓存时间
|
||||
log.info("缓存");
|
||||
cacheHalf.put(vin + "-events", vehicleEvent);
|
||||
return (VehicleEvent) vehicleEvent;
|
||||
}
|
||||
vehicleEvent = this.getOne(new LambdaQueryWrapper<>() {{
|
||||
eq(VehicleEvent::getVin, vin);
|
||||
}});
|
||||
cacheHalf.put(vin + "-events", vehicleEvent);
|
||||
log.info("数据库");
|
||||
Object ifPresent = cacheHalf.getIfPresent(vin + "-events");
|
||||
log.info("缓存信息为:{}", ifPresent);
|
||||
return (VehicleEvent) vehicleEvent;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
package com.mobai.vehicle.event.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.mobai.domain.EventReq;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.domain.VehicleEvent;
|
||||
import com.mobai.vehicle.event.mapper.VehicleEventMapper;
|
||||
import com.mobai.vehicle.event.service.EventActiveService;
|
||||
import com.mobai.vehicle.event.service.EventsService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className EventsServiceImpl
|
||||
* @description 描述
|
||||
* @date 2024/6/18 11:39
|
||||
*/
|
||||
|
||||
@Service
|
||||
public class EventsServiceImpl extends ServiceImpl<VehicleEventMapper,VehicleEvent> implements EventsService {
|
||||
|
||||
@Autowired
|
||||
private EventActiveService eventActiveService;
|
||||
|
||||
@Override
|
||||
public Result activeEvent(EventReq req) {
|
||||
switch (req.getEventType()){
|
||||
case 0:
|
||||
return eventActiveService.realTimeData(req);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package com.mobai.vehicle.event.standard;
|
||||
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author Saisai
|
||||
* @className StandardEvent
|
||||
* @description 描述
|
||||
* @date 2024/6/26 16:32
|
||||
*/
|
||||
@Component
|
||||
public class StandardEvent {
|
||||
|
||||
|
||||
@RabbitListener(queues = {"standard_Warn_Event_Start"})
|
||||
public void eventStart(String msg, Message message, AMQP.Channel channel) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@RabbitListener(queues = {"standard_Warn_Event_End"})
|
||||
public void eventEnd(String msg, Message message, AMQP.Channel channel) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -112,12 +112,13 @@ spring:
|
|||
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
|
||||
poll-timeout: 600000
|
||||
|
||||
|
||||
# mybatis配置
|
||||
mybatis:
|
||||
# 搜索指定包别名
|
||||
typeAliasesPackage: com.mobai
|
||||
typeAliasesPackage: com.mobai.*.*.mapper
|
||||
# 配置mapper的扫描,找到所有的mapper.xml映射文件
|
||||
mapperLocations: classpath:mapper/**/*.xml
|
||||
logging:
|
||||
level:
|
||||
com.mobai.mapper: DEBUG
|
||||
com.mobai.*.*.mapper: DEBUG
|
||||
|
|
Loading…
Reference in New Issue