diff --git a/pom.xml b/pom.xml
index 31d957b..5f280e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,20 @@
2.6.13
-
+
+
+
+ net.sf.ehcache
+ ehcache
+ 2.10.9.2
+
+
+ org.springframework.boot
+ spring-boot-starter-cache
+ 2.6.13
+
+
+
org.apache.iotdb
iotdb-session
diff --git a/src/main/java/com/hyc/ParseMessageApplication.java b/src/main/java/com/hyc/ParseMessageApplication.java
index f33c70d..1dea261 100644
--- a/src/main/java/com/hyc/ParseMessageApplication.java
+++ b/src/main/java/com/hyc/ParseMessageApplication.java
@@ -2,7 +2,9 @@ package com.hyc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cache.annotation.EnableCaching;
+@EnableCaching
@SpringBootApplication
public class ParseMessageApplication {
diff --git a/src/main/java/com/hyc/config/MsgComment.java b/src/main/java/com/hyc/config/MsgComment.java
new file mode 100644
index 0000000..ec4def4
--- /dev/null
+++ b/src/main/java/com/hyc/config/MsgComment.java
@@ -0,0 +1,126 @@
+package com.hyc.config;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.gson.JsonObject;
+import com.hyc.domain.CacheCarEvent;
+import com.hyc.domain.NodeName;
+import com.hyc.domain.req.CarEventUpdate;
+import com.hyc.simulate.constant.ExchangeConstant;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * rabbit更新缓存
+ *
+ * @author YouChe·He
+ * @ClassName: RabbitConfig
+ * @Description: rabbit配置
+ * @CreateTime: 2024/6/16 10:32
+ */
+@Configuration
+@Slf4j
+public class MsgComment {
+
+ @Autowired
+ private NodeName nodeName;
+ @Autowired
+ private CacheManager cacheManager;
+ @Bean
+ public FanoutExchange eventUpdateExchange() {
+ return new FanoutExchange(ExchangeConstant.UPDATE_CONSTANT);
+ }
+
+ @Bean
+ public Queue queue() {
+ return new Queue(nodeName.getNodeName());
+ }
+
+ @Bean
+ public Binding binding() {
+ return new Binding(nodeName.getNodeName(),
+ Binding.DestinationType.QUEUE, ExchangeConstant.UPDATE_CONSTANT,
+ "", null);
+ }
+
+ @Bean
+ public ConnectionFactory connectionFactory() {
+ CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
+ connectionFactory.setHost("47.103.75.98");
+ connectionFactory.setPort(5672);
+ connectionFactory.setPassword("guest");
+ connectionFactory.setUsername("guest");
+ //开启发送端确认
+ connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//SIMPLE=简单的;CORRELATED=相关的;NONE=关闭b;
+ //开启消息返回
+ connectionFactory.setPublisherReturns(true);
+ connectionFactory.createConnection();
+ return connectionFactory;
+ }
+
+ @Bean
+ public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
+ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
+ rabbitAdmin.setAutoStartup(true);
+ return rabbitAdmin;
+ }
+
+ @Bean
+ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+
+ rabbitTemplate.setMandatory(true);
+ //消息返回
+ rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> {
+ log.info("消息返回实现,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}", message, i, s, s1, s2);
+ });
+ //发送端确认
+ rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
+ log.info("发送端确认,correlationData:{},ack:{},cause:{}", correlationData, b, s);
+
+ });
+ return rabbitTemplate;
+ }
+
+ @Bean
+ public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
+ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
+ //针对哪些队列(参数为可变参数)
+ simpleMessageListenerContainer.setQueueNames(nodeName.getNodeName());
+ //同时有多少个消费者线程在消费这个队列,相当于线程池的线程数字。
+ simpleMessageListenerContainer.setConcurrentConsumers(3);
+ //最大的消费者线程数
+ simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
+ //设置消息确认方式 NONE=不确认,MANUAL=手动确认,AUTO=自动确认;
+ //自动确认
+ //simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
+ //simpleMessageListenerContainer.setMessageListener(message -> log.info("springboot.rabbitmq-queue接收到的消息:[{}]", message));
+ //手动确认(单条确认)
+ simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+ simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
+ log.info("队列名:{}",nodeName.getNodeName());
+ log.info("接收到的消息:[{}]",message.toString());
+ String eventString = new String(message.getBody());
+ CarEventUpdate carEventUpdate = JSONObject.parseObject(eventString, CarEventUpdate.class);
+
+ Cache carEvent = cacheManager.getCache("carEvent");
+ carEvent.put(carEventUpdate.getVin(),carEventUpdate.getHandleEvent());
+ Cache.ValueWrapper valueWrapper = carEvent.get(carEventUpdate.getVin());
+ log.info("小车{}更新后的缓存中事件是:{}",carEventUpdate.getVin(),valueWrapper.get().toString());
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+ });
+ //消费端限流
+ simpleMessageListenerContainer.setPrefetchCount(1);
+ return simpleMessageListenerContainer;
+ }
+}
diff --git a/src/main/java/com/hyc/consumer/EventUpdateConsumer.java b/src/main/java/com/hyc/consumer/EventUpdateConsumer.java
new file mode 100644
index 0000000..c65871e
--- /dev/null
+++ b/src/main/java/com/hyc/consumer/EventUpdateConsumer.java
@@ -0,0 +1,19 @@
+package com.hyc.consumer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * 事件更新消费者
+ *
+ * @author YouChe·He
+ * @ClassName: EventUpdateConsumer
+ * @Description: 事件更新消费者
+ * @CreateTime: 2024/6/16 10:28
+ */
+@Component
+@Slf4j
+public class EventUpdateConsumer {
+
+
+}
diff --git a/src/main/java/com/hyc/domain/CacheCarEvent.java b/src/main/java/com/hyc/domain/CacheCarEvent.java
new file mode 100644
index 0000000..8da6f9a
--- /dev/null
+++ b/src/main/java/com/hyc/domain/CacheCarEvent.java
@@ -0,0 +1,38 @@
+package com.hyc.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.kafka.common.protocol.types.Field;
+
+/**
+ * 缓存小车事件类
+ *
+ * @author YouChe·He
+ * @ClassName: CacheCarEvent
+ * @Description: 缓存小车事件类
+ * @CreateTime: 2024/6/14 20:29
+ */
+@Data
+@ToString
+@NoArgsConstructor
+@AllArgsConstructor
+public class CacheCarEvent {
+ /**
+ * 对应id
+ */
+ private Integer id;
+ /**
+ * 小车VIN
+ */
+ private String vin;
+ /**
+ * 绑定事件
+ * 1:电子围栏事件
+ * 2:实时数据事件
+ * 3:故障预警事件
+ */
+ private String eventValue;
+
+}
diff --git a/src/main/java/com/hyc/domain/NodeName.java b/src/main/java/com/hyc/domain/NodeName.java
new file mode 100644
index 0000000..cfbdabb
--- /dev/null
+++ b/src/main/java/com/hyc/domain/NodeName.java
@@ -0,0 +1,23 @@
+package com.hyc.domain;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 节点名称
+ *
+ * @author YouChe·He
+ * @ClassName: NodeName
+ * @Description: 节点名称
+ * @CreateTime: 2024/6/16 10:03
+ */
+@Configuration
+public class NodeName {
+ @Value("${event.name}")
+ private String name;
+ @Value("${event.partition}")
+ private String partition;
+ public String getNodeName(){
+ return name+"_"+partition;
+ }
+}
diff --git a/src/main/java/com/hyc/domain/VehicleData.java b/src/main/java/com/hyc/domain/VehicleData.java
new file mode 100644
index 0000000..5f9e48c
--- /dev/null
+++ b/src/main/java/com/hyc/domain/VehicleData.java
@@ -0,0 +1,68 @@
+package com.hyc.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * 小车数据类
+ *
+ * @author YouChe·He
+ * @ClassName: VehicleData
+ * @Description: 小车数据类
+ * @CreateTime: 2024/6/14 16:19
+ */
+@Data
+@ToString
+@NoArgsConstructor
+@AllArgsConstructor
+public class VehicleData {
+ private String vin;
+ private Long startTime;
+ private Double longitude;
+ private Double latitude;
+ private Double speed;
+ private Double mileage;
+ private Double voltage;
+ private Double current;
+ private Double resistance;
+ private String gear;
+ private Double accelerationPedal;
+ private Double brakePedal;
+ private Double fuelConsumptionRate;
+ private Double motorControllerTemperature;
+ private Double motorSpeed;
+ private Double motorTorque;
+ private Double motorTemperature;
+ private Double motorVoltage;
+ private Double motorCurrent;
+ private Double remainingBattery;
+ private Double maximumFeedbackPower;
+ private Double maximumDischargePower;
+ private Double selfCheckCounter;
+ private Double totalBatteryCurrent;
+ private Double totalBatteryVoltage;
+ private Double singleBatteryMaxVoltage;
+ private Double singleBatteryMinVoltage;
+ private Double singleBatteryMaxTemperature;
+ private Double singleBatteryMinTemperature;
+ private Double availableBatteryCapacity;
+ private Double vehicleStatus;
+ private Double chargingStatus;
+ private Double operatingStatus;
+ private Double socStatus;
+ private Double chargingEnergyStorageStatus;
+ private Double driveMotorStatus;
+ private Double positionStatus;
+ private Double easStatus;
+ private Double ptcStatus;
+ private Double epsStatus;
+ private Double absStatus;
+ private Double mcuStatus;
+ private Double heatingStatus;
+ private Double batteryStatus;
+ private Double batteryInsulationStatus;
+ private Double dcdcStatus;
+ private Double chgStatus;
+}
diff --git a/src/main/java/com/hyc/domain/constant/EventConstant.java b/src/main/java/com/hyc/domain/constant/EventConstant.java
new file mode 100644
index 0000000..5fb61e2
--- /dev/null
+++ b/src/main/java/com/hyc/domain/constant/EventConstant.java
@@ -0,0 +1,16 @@
+package com.hyc.domain.constant;
+
+import lombok.Data;
+
+/**
+ * 事件常量
+ *
+ * @author YouChe·He
+ * @ClassName: EventConstant
+ * @Description: 事件常量
+ * @CreateTime: 2024/6/16 15:39
+ */
+@Data
+public class EventConstant {
+ public final static String REAL_TIME_DATA = "_REAL_TIME_DATA";
+}
diff --git a/src/main/java/com/hyc/domain/req/CarEventUpdate.java b/src/main/java/com/hyc/domain/req/CarEventUpdate.java
new file mode 100644
index 0000000..2af03f4
--- /dev/null
+++ b/src/main/java/com/hyc/domain/req/CarEventUpdate.java
@@ -0,0 +1,32 @@
+package com.hyc.domain.req;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+/**
+ * 小车绑定事件更新类
+ *
+ * @author YouChe·He
+ * @ClassName: CarEventUpdate
+ * @Description: 小车绑定事件更新类
+ * @CreateTime: 2024/6/15 18:46
+ */
+@Data
+@ToString
+@NoArgsConstructor
+@AllArgsConstructor
+public class CarEventUpdate implements Serializable{
+ /**
+ * 修改小车VIN
+ */
+ private String vin;
+ /**
+ * 动作类型
+ */
+ private String handleEvent;
+
+}
diff --git a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java
index 781701a..24a384b 100644
--- a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java
+++ b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java
@@ -1,16 +1,21 @@
package com.hyc.kafka.demo.consumer;
-import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+import com.hyc.domain.CacheCarEvent;
import com.hyc.domain.VehicleData;
+import com.hyc.domain.constant.EventConstant;
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
import com.hyc.iotdbdemo.server.IotDbServer;
import com.hyc.kafka.demo.strategy.Strategy;
+import com.hyc.kafka.demo.service.CarEventService;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
@@ -22,6 +27,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* kafka消费者
@@ -34,6 +40,11 @@ import java.util.Random;
@Slf4j
@Service
public class KafkaConsumer {
+ @Autowired
+ private CacheManager cacheManager;
+
+ @Autowired
+ private CarEventService carEventService;
@Autowired
private RedisTemplate redisTemplate;
@@ -46,9 +57,9 @@ public class KafkaConsumer {
public void consume(ConsumerRecord