feat()维护本地缓存
parent
84a91d275c
commit
f8c5b56b10
15
pom.xml
15
pom.xml
|
@ -14,7 +14,20 @@
|
|||
<spring-boot.version>2.6.13</spring-boot.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<!-- iotDB-->
|
||||
|
||||
<!-- ehcahe-->
|
||||
<dependency>
|
||||
<groupId>net.sf.ehcache</groupId>
|
||||
<artifactId>ehcache</artifactId>
|
||||
<version>2.10.9.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-cache</artifactId>
|
||||
<version>2.6.13</version>
|
||||
</dependency>
|
||||
|
||||
<!-- iotDB-->
|
||||
<dependency>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-session</artifactId>
|
||||
|
|
|
@ -2,7 +2,9 @@ package com.hyc;
|
|||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
|
||||
@EnableCaching
|
||||
@SpringBootApplication
|
||||
public class ParseMessageApplication {
|
||||
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
package com.hyc.config;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
import com.hyc.domain.NodeName;
|
||||
import com.hyc.domain.req.CarEventUpdate;
|
||||
import com.hyc.simulate.constant.ExchangeConstant;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* rabbit更新缓存
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: RabbitConfig
|
||||
* @Description: rabbit配置
|
||||
* @CreateTime: 2024/6/16 10:32
|
||||
*/
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class MsgComment {
|
||||
|
||||
@Autowired
|
||||
private NodeName nodeName;
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
@Bean
|
||||
public FanoutExchange eventUpdateExchange() {
|
||||
return new FanoutExchange(ExchangeConstant.UPDATE_CONSTANT);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue queue() {
|
||||
return new Queue(nodeName.getNodeName());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding binding() {
|
||||
return new Binding(nodeName.getNodeName(),
|
||||
Binding.DestinationType.QUEUE, ExchangeConstant.UPDATE_CONSTANT,
|
||||
"", null);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory connectionFactory() {
|
||||
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
|
||||
connectionFactory.setHost("47.103.75.98");
|
||||
connectionFactory.setPort(5672);
|
||||
connectionFactory.setPassword("guest");
|
||||
connectionFactory.setUsername("guest");
|
||||
//开启发送端确认
|
||||
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//SIMPLE=简单的;CORRELATED=相关的;NONE=关闭b;
|
||||
//开启消息返回
|
||||
connectionFactory.setPublisherReturns(true);
|
||||
connectionFactory.createConnection();
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
|
||||
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
|
||||
rabbitAdmin.setAutoStartup(true);
|
||||
return rabbitAdmin;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
|
||||
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
||||
|
||||
rabbitTemplate.setMandatory(true);
|
||||
//消息返回
|
||||
rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> {
|
||||
log.info("消息返回实现,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}", message, i, s, s1, s2);
|
||||
});
|
||||
//发送端确认
|
||||
rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
|
||||
log.info("发送端确认,correlationData:{},ack:{},cause:{}", correlationData, b, s);
|
||||
|
||||
});
|
||||
return rabbitTemplate;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
|
||||
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
|
||||
//针对哪些队列(参数为可变参数)
|
||||
simpleMessageListenerContainer.setQueueNames(nodeName.getNodeName());
|
||||
//同时有多少个消费者线程在消费这个队列,相当于线程池的线程数字。
|
||||
simpleMessageListenerContainer.setConcurrentConsumers(3);
|
||||
//最大的消费者线程数
|
||||
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
|
||||
//设置消息确认方式 NONE=不确认,MANUAL=手动确认,AUTO=自动确认;
|
||||
//自动确认
|
||||
//simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
|
||||
//simpleMessageListenerContainer.setMessageListener(message -> log.info("springboot.rabbitmq-queue接收到的消息:[{}]", message));
|
||||
//手动确认(单条确认)
|
||||
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
||||
simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
|
||||
log.info("队列名:{}",nodeName.getNodeName());
|
||||
log.info("接收到的消息:[{}]",message.toString());
|
||||
String eventString = new String(message.getBody());
|
||||
CarEventUpdate carEventUpdate = JSONObject.parseObject(eventString, CarEventUpdate.class);
|
||||
|
||||
Cache carEvent = cacheManager.getCache("carEvent");
|
||||
carEvent.put(carEventUpdate.getVin(),carEventUpdate.getHandleEvent());
|
||||
Cache.ValueWrapper valueWrapper = carEvent.get(carEventUpdate.getVin());
|
||||
log.info("小车{}更新后的缓存中事件是:{}",carEventUpdate.getVin(),valueWrapper.get().toString());
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
});
|
||||
//消费端限流
|
||||
simpleMessageListenerContainer.setPrefetchCount(1);
|
||||
return simpleMessageListenerContainer;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package com.hyc.consumer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 事件更新消费者
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: EventUpdateConsumer
|
||||
* @Description: 事件更新消费者
|
||||
* @CreateTime: 2024/6/16 10:28
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class EventUpdateConsumer {
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package com.hyc.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import org.apache.kafka.common.protocol.types.Field;
|
||||
|
||||
/**
|
||||
* 缓存小车事件类
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: CacheCarEvent
|
||||
* @Description: 缓存小车事件类
|
||||
* @CreateTime: 2024/6/14 20:29
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class CacheCarEvent {
|
||||
/**
|
||||
* 对应id
|
||||
*/
|
||||
private Integer id;
|
||||
/**
|
||||
* 小车VIN
|
||||
*/
|
||||
private String vin;
|
||||
/**
|
||||
* 绑定事件
|
||||
* 1:电子围栏事件
|
||||
* 2:实时数据事件
|
||||
* 3:故障预警事件
|
||||
*/
|
||||
private String eventValue;
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package com.hyc.domain;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 节点名称
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: NodeName
|
||||
* @Description: 节点名称
|
||||
* @CreateTime: 2024/6/16 10:03
|
||||
*/
|
||||
@Configuration
|
||||
public class NodeName {
|
||||
@Value("${event.name}")
|
||||
private String name;
|
||||
@Value("${event.partition}")
|
||||
private String partition;
|
||||
public String getNodeName(){
|
||||
return name+"_"+partition;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package com.hyc.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
/**
|
||||
* 小车数据类
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: VehicleData
|
||||
* @Description: 小车数据类
|
||||
* @CreateTime: 2024/6/14 16:19
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class VehicleData {
|
||||
private String vin;
|
||||
private Long startTime;
|
||||
private Double longitude;
|
||||
private Double latitude;
|
||||
private Double speed;
|
||||
private Double mileage;
|
||||
private Double voltage;
|
||||
private Double current;
|
||||
private Double resistance;
|
||||
private String gear;
|
||||
private Double accelerationPedal;
|
||||
private Double brakePedal;
|
||||
private Double fuelConsumptionRate;
|
||||
private Double motorControllerTemperature;
|
||||
private Double motorSpeed;
|
||||
private Double motorTorque;
|
||||
private Double motorTemperature;
|
||||
private Double motorVoltage;
|
||||
private Double motorCurrent;
|
||||
private Double remainingBattery;
|
||||
private Double maximumFeedbackPower;
|
||||
private Double maximumDischargePower;
|
||||
private Double selfCheckCounter;
|
||||
private Double totalBatteryCurrent;
|
||||
private Double totalBatteryVoltage;
|
||||
private Double singleBatteryMaxVoltage;
|
||||
private Double singleBatteryMinVoltage;
|
||||
private Double singleBatteryMaxTemperature;
|
||||
private Double singleBatteryMinTemperature;
|
||||
private Double availableBatteryCapacity;
|
||||
private Double vehicleStatus;
|
||||
private Double chargingStatus;
|
||||
private Double operatingStatus;
|
||||
private Double socStatus;
|
||||
private Double chargingEnergyStorageStatus;
|
||||
private Double driveMotorStatus;
|
||||
private Double positionStatus;
|
||||
private Double easStatus;
|
||||
private Double ptcStatus;
|
||||
private Double epsStatus;
|
||||
private Double absStatus;
|
||||
private Double mcuStatus;
|
||||
private Double heatingStatus;
|
||||
private Double batteryStatus;
|
||||
private Double batteryInsulationStatus;
|
||||
private Double dcdcStatus;
|
||||
private Double chgStatus;
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.hyc.domain.constant;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 事件常量
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: EventConstant
|
||||
* @Description: 事件常量
|
||||
* @CreateTime: 2024/6/16 15:39
|
||||
*/
|
||||
@Data
|
||||
public class EventConstant {
|
||||
public final static String REAL_TIME_DATA = "_REAL_TIME_DATA";
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.hyc.domain.req;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 小车绑定事件更新类
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: CarEventUpdate
|
||||
* @Description: 小车绑定事件更新类
|
||||
* @CreateTime: 2024/6/15 18:46
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class CarEventUpdate implements Serializable{
|
||||
/**
|
||||
* 修改小车VIN
|
||||
*/
|
||||
private String vin;
|
||||
/**
|
||||
* 动作类型
|
||||
*/
|
||||
private String handleEvent;
|
||||
|
||||
}
|
|
@ -1,16 +1,21 @@
|
|||
package com.hyc.kafka.demo.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
import com.hyc.domain.VehicleData;
|
||||
import com.hyc.domain.constant.EventConstant;
|
||||
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
|
||||
import com.hyc.iotdbdemo.server.IotDbServer;
|
||||
import com.hyc.kafka.demo.strategy.Strategy;
|
||||
import com.hyc.kafka.demo.service.CarEventService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
|
@ -22,6 +27,7 @@ import java.util.ArrayList;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
|
@ -34,6 +40,11 @@ import java.util.Random;
|
|||
@Slf4j
|
||||
@Service
|
||||
public class KafkaConsumer {
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
|
||||
@Autowired
|
||||
private CarEventService carEventService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String,String> redisTemplate;
|
||||
|
@ -46,9 +57,9 @@ public class KafkaConsumer {
|
|||
public void consume(ConsumerRecord<Object,Object> consumerRecord, Acknowledgment acknowledgment) {
|
||||
try {
|
||||
//策略map集合
|
||||
LinkedHashMap<String, Strategy> stringStrategyLinkedHashMap = new LinkedHashMap<>();
|
||||
stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA);
|
||||
stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA);
|
||||
// LinkedHashMap<String, Strategy> stringStrategyLinkedHashMap = new LinkedHashMap<>();
|
||||
// stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA);
|
||||
// stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA);
|
||||
|
||||
//解析得到VIN
|
||||
String value = (String) consumerRecord.value();
|
||||
|
@ -57,6 +68,31 @@ public class KafkaConsumer {
|
|||
log.error("消费者0得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition());
|
||||
|
||||
iotDbServer.insertData(vehicleData);
|
||||
//得到小车绑定信息
|
||||
String carEventByVin = this.getCarEventByVin(vehicleData.getVin());
|
||||
//解析绑定的信息
|
||||
String[] eventArr = carEventByVin.split(",");
|
||||
for (String eventNumber : eventArr) {
|
||||
//电子围栏事件
|
||||
if (eventNumber.equals("1")){
|
||||
//从缓存中查询车辆绑定的围栏组,与车辆信息进行比较
|
||||
}
|
||||
//实时数据事件
|
||||
if (eventNumber.equals("2")) {
|
||||
if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){
|
||||
//有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中
|
||||
String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA;
|
||||
String carData = JSONObject.toJSONString(vehicleData);
|
||||
this.updateValueWithoutChangingTTL(key, value);
|
||||
}
|
||||
}
|
||||
//故障预警事件
|
||||
if (eventNumber.equals("3")) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// String dataMessage = value.toString();
|
||||
// JSONObject jsonObject = JSON.parseObject(dataMessage);
|
||||
// //根据VIN得到该小车拥有的事件
|
||||
|
@ -89,4 +125,48 @@ public class KafkaConsumer {
|
|||
}
|
||||
return strings;
|
||||
}
|
||||
|
||||
public String getCarEventByVin(String vin){
|
||||
|
||||
|
||||
// 先从本地缓存获取
|
||||
Cache cache = cacheManager.getCache("carEvent");
|
||||
Cache.ValueWrapper valueWrapper = cache.get(vin);
|
||||
if (valueWrapper != null) {
|
||||
return valueWrapper.get().toString();
|
||||
}
|
||||
|
||||
// 从 Redis 获取
|
||||
|
||||
String carEventString = redisTemplate.opsForValue().get(vin);
|
||||
if (carEventString != null) {
|
||||
// 将数据同步到本地缓存
|
||||
cache.put(vin, carEventString);
|
||||
return carEventString;
|
||||
}
|
||||
|
||||
// 从 MySQL 获取
|
||||
CacheCarEvent cacheCarEvent = carEventService.getCarEvent(vin);
|
||||
if (cacheCarEvent != null) {
|
||||
// 将数据存入 Redis 和本地缓存
|
||||
redisTemplate.opsForValue().set(vin, cacheCarEvent.getEventValue());
|
||||
cache.put(vin, cacheCarEvent.getEventValue());
|
||||
}
|
||||
|
||||
return cacheCarEvent.getEventValue();
|
||||
|
||||
}
|
||||
|
||||
public void updateValueWithoutChangingTTL(String key, String newValue) {
|
||||
// 获取当前的过期时间
|
||||
Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS);
|
||||
|
||||
// 更新键的值
|
||||
redisTemplate.opsForValue().set(key, newValue);
|
||||
|
||||
// 重新设置过期时间
|
||||
if (currentTTL != null && currentTTL > 0) {
|
||||
redisTemplate.expire(key, currentTTL, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,31 @@
|
|||
package com.hyc.kafka.demo.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
import com.hyc.domain.VehicleData;
|
||||
import com.hyc.domain.constant.EventConstant;
|
||||
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
|
||||
import com.hyc.iotdbdemo.server.IotDbServer;
|
||||
import com.hyc.kafka.demo.service.CarEventService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.rmi.ServerException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
*
|
||||
|
@ -17,14 +37,133 @@ import org.springframework.stereotype.Service;
|
|||
@Slf4j
|
||||
@Service
|
||||
public class KafkaConsumer1 {
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
|
||||
@Autowired
|
||||
private CarEventService carEventService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String,String> redisTemplate;
|
||||
@Resource
|
||||
private IotDbServer iotDbServer;
|
||||
@Resource
|
||||
private IotDBSessionConfig iotDBSessionConfig;
|
||||
@KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory",
|
||||
errorHandler = "myKafkaListenerErrorHandler")
|
||||
errorHandler = "myKafkaListenerErrorHandler")
|
||||
public void consume(ConsumerRecord<Object,Object> consumerRecord, Acknowledgment acknowledgment) {
|
||||
try {
|
||||
Object value = consumerRecord.value();
|
||||
log.error("消费者1得到的数据:{},所在分区:{}",value,consumerRecord.partition());
|
||||
}finally {
|
||||
//策略map集合
|
||||
// LinkedHashMap<String, Strategy> stringStrategyLinkedHashMap = new LinkedHashMap<>();
|
||||
// stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA);
|
||||
// stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA);
|
||||
|
||||
//解析得到VIN
|
||||
String value = (String) consumerRecord.value();
|
||||
|
||||
VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class);
|
||||
log.error("消费者1得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition());
|
||||
|
||||
iotDbServer.insertData(vehicleData);
|
||||
//得到小车绑定信息
|
||||
String carEventByVin = this.getCarEventByVin(vehicleData.getVin());
|
||||
//解析绑定的信息
|
||||
String[] eventArr = carEventByVin.split(",");
|
||||
for (String eventNumber : eventArr) {
|
||||
//电子围栏事件
|
||||
if (eventNumber.equals("1")){
|
||||
//从缓存中查询车辆绑定的围栏组,与车辆信息进行比较
|
||||
}
|
||||
//实时数据事件
|
||||
if (eventNumber.equals("2")) {
|
||||
if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){
|
||||
//有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中
|
||||
String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA;
|
||||
String carData = JSONObject.toJSONString(vehicleData);
|
||||
this.updateValueWithoutChangingTTL(key, value);
|
||||
}
|
||||
}
|
||||
//故障预警事件
|
||||
if (eventNumber.equals("3")) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// String dataMessage = value.toString();
|
||||
// JSONObject jsonObject = JSON.parseObject(dataMessage);
|
||||
// //根据VIN得到该小车拥有的事件
|
||||
// String vin = jsonObject.get("vin").toString();
|
||||
// List<String> eventList = getEvent(vin);
|
||||
// //循环事件集合,并执行响应的事件
|
||||
// for (String event : eventList) {
|
||||
// stringStrategyLinkedHashMap.get(event).exe(redisTemplate,dataMessage);
|
||||
// }
|
||||
|
||||
} catch (ServerException e) {
|
||||
log.error("添加iotDb异常");
|
||||
throw new RuntimeException(e);
|
||||
} catch (IoTDBConnectionException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (StatementExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getEvent(String vin){
|
||||
ArrayList<String> strings = new ArrayList<>();
|
||||
|
||||
strings.add("存储数据");
|
||||
int nextInt = new Random().nextInt(100);
|
||||
if (nextInt % 2 ==0){
|
||||
strings.add("实时数据");
|
||||
}
|
||||
return strings;
|
||||
}
|
||||
|
||||
public String getCarEventByVin(String vin){
|
||||
|
||||
|
||||
// 先从本地缓存获取
|
||||
Cache cache = cacheManager.getCache("carEvent");
|
||||
Cache.ValueWrapper valueWrapper = cache.get(vin);
|
||||
if (valueWrapper != null) {
|
||||
return valueWrapper.get().toString();
|
||||
}
|
||||
|
||||
// 从 Redis 获取
|
||||
|
||||
String carEventString = redisTemplate.opsForValue().get(vin);
|
||||
if (carEventString != null) {
|
||||
// 将数据同步到本地缓存
|
||||
cache.put(vin, carEventString);
|
||||
return carEventString;
|
||||
}
|
||||
|
||||
// 从 MySQL 获取
|
||||
CacheCarEvent cacheCarEvent = carEventService.getCarEvent(vin);
|
||||
if (cacheCarEvent != null) {
|
||||
// 将数据存入 Redis 和本地缓存
|
||||
redisTemplate.opsForValue().set(vin, cacheCarEvent.getEventValue());
|
||||
cache.put(vin, cacheCarEvent.getEventValue());
|
||||
}
|
||||
|
||||
return cacheCarEvent.getEventValue();
|
||||
|
||||
}
|
||||
|
||||
public void updateValueWithoutChangingTTL(String key, String newValue) {
|
||||
// 获取当前的过期时间
|
||||
Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS);
|
||||
|
||||
// 更新键的值
|
||||
redisTemplate.opsForValue().set(key, newValue);
|
||||
|
||||
// 重新设置过期时间
|
||||
if (currentTTL != null && currentTTL > 0) {
|
||||
redisTemplate.expire(key, currentTTL, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package com.hyc.kafka.demo.mapper;
|
||||
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 小车事件持久层
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: CarEventMapper
|
||||
* @Description: 小车事件持久层
|
||||
* @CreateTime: 2024/6/14 20:44
|
||||
*/
|
||||
@Mapper
|
||||
public interface CarEventMapper {
|
||||
CacheCarEvent getCarEvent(String vin);
|
||||
|
||||
List<CacheCarEvent> getAllCarEvent();
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package com.hyc.kafka.demo.service;
|
||||
/**
|
||||
* 小车绑定事件处理
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: CarEventService
|
||||
* @Description: 小车绑定事件处理
|
||||
* @CreateTime: 2024/6/14 20:38
|
||||
*/
|
||||
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*@ClassName CarEventService
|
||||
*@Description 描述
|
||||
*@Author ZHIHAO.DAI
|
||||
*@Date 2024/6/14 20:38
|
||||
*/
|
||||
public interface CarEventService {
|
||||
CacheCarEvent getCarEvent(String substring);
|
||||
|
||||
List<CacheCarEvent> getAllCarEvent();
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.hyc.kafka.demo.service.impl;
|
||||
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
import com.hyc.kafka.demo.mapper.CarEventMapper;
|
||||
import com.hyc.kafka.demo.service.CarEventService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 小车绑定服务实现类
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: CarEventServiceImpl
|
||||
* @Description: 小车绑定服务实现类
|
||||
* @CreateTime: 2024/6/14 20:38
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class CarEventServiceImpl implements CarEventService {
|
||||
@Autowired
|
||||
private CarEventMapper carEventMapper;
|
||||
|
||||
@Override
|
||||
public CacheCarEvent getCarEvent(String vin) {
|
||||
log.info("小车的VIN是:{}",vin);
|
||||
CacheCarEvent cacheCarEvent = carEventMapper.getCarEvent(vin);
|
||||
return cacheCarEvent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CacheCarEvent> getAllCarEvent() {
|
||||
return carEventMapper.getAllCarEvent();
|
||||
}
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
package com.hyc.producer;
|
||||
|
||||
/**
|
||||
* 传递报文生产者
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: TransmitMessageProducer
|
||||
* @Description: 传递报文生产者
|
||||
* @CreateTime: 2024/6/5 14:35
|
||||
*/
|
||||
public class TransmitMessageProducer {
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package com.hyc.runner;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
import com.hyc.domain.SummarizeResp;
|
||||
import com.hyc.kafka.demo.service.CarEventService;
|
||||
import com.hyc.service.SummarizeService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 同步缓存Runner
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: SyncCacheRunner
|
||||
* @Description: 同步缓存Runner
|
||||
* @CreateTime: 2024/6/16 11:40
|
||||
*/
|
||||
@Component
|
||||
public class SyncCacheRunner implements ApplicationRunner {
|
||||
@Autowired
|
||||
private CarEventService carEventService;
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
@Autowired
|
||||
private RedisTemplate<String,String> redisTemplate;
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
Cache cache = cacheManager.getCache("carEvent");
|
||||
List<CacheCarEvent> cacheCarEventList = carEventService.getAllCarEvent();
|
||||
for (CacheCarEvent cacheCarEvent : cacheCarEventList) {
|
||||
redisTemplate.opsForValue().set(cacheCarEvent.getVin(), JSONObject.toJSONString(cacheCarEvent));
|
||||
cache.put(cacheCarEvent.getVin(), cacheCarEvent.getEventValue());
|
||||
}
|
||||
// 将数据存入 Redis 和本地缓存
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.hyc.simulate.constant;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.FanoutExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 交换机常量
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: ExchangeConstant
|
||||
* @Description: 交换机常量
|
||||
* @CreateTime: 2024/6/15 19:01
|
||||
*/
|
||||
@Configuration
|
||||
public class ExchangeConstant {
|
||||
|
||||
public final static String UPDATE_CONSTANT = "update_constant";
|
||||
|
||||
@Bean
|
||||
public FanoutExchange handleEvent(){
|
||||
return new FanoutExchange(UPDATE_CONSTANT);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package com.hyc.simulate.kafkatest;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hyc.domain.CacheCarEvent;
|
||||
import com.hyc.domain.req.CarEventUpdate;
|
||||
import com.hyc.simulate.constant.ExchangeConstant;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* 更新事件测试类
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: RenewalEventTest
|
||||
* @Description: 更新事件测试类
|
||||
* @CreateTime: 2024/6/15 18:40
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("test")
|
||||
public class RenewalEventTest {
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
|
||||
@PostMapping("/broadcast")
|
||||
public String broadcastMessage(@RequestBody CarEventUpdate carEventUpdate){
|
||||
|
||||
Cache cache = cacheManager.getCache("carEvent");
|
||||
Cache.ValueWrapper valueWrapper = cache.get(carEventUpdate.getVin());
|
||||
log.info("小车:{}更新前的缓存中事件值是:{}",carEventUpdate.getVin(),valueWrapper.get().toString());
|
||||
if (valueWrapper != null) {
|
||||
|
||||
}
|
||||
rabbitTemplate.convertAndSend(ExchangeConstant.UPDATE_CONSTANT,"", JSONObject.toJSONString(carEventUpdate));
|
||||
return "sent success!";
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package com.hyc.simulate.redistest;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hyc.domain.VehicleData;
|
||||
import com.hyc.domain.constant.EventConstant;
|
||||
import com.hyc.result.Result;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 在redsi中创建事件相关key
|
||||
*
|
||||
* @author YouChe·He
|
||||
* @ClassName: RedisCreateKey
|
||||
* @Description: 在redsi中创建事件相关key
|
||||
* @CreateTime: 2024/6/16 15:35
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/test/redis")
|
||||
public class RedisCreateKey {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String,String> redisTemplate;
|
||||
|
||||
@RequestMapping("/createRealTimeDataKey")
|
||||
public Result createRealTimeDataKey(String vin) {
|
||||
redisTemplate.opsForValue().set(vin + EventConstant.REAL_TIME_DATA,"",3, TimeUnit.MINUTES);
|
||||
return Result.success("","创建成功");
|
||||
}
|
||||
|
||||
@RequestMapping("/getDataByRedis")
|
||||
public Result<VehicleData> getDataByRedis(String vin){
|
||||
String vehicleJson = redisTemplate.opsForValue().get(vin + EventConstant.REAL_TIME_DATA);
|
||||
VehicleData vehicleData = JSONObject.parseObject(vehicleJson, VehicleData.class);
|
||||
redisTemplate.opsForValue().set(vin+EventConstant.REAL_TIME_DATA,vehicleJson,3,TimeUnit.MINUTES);
|
||||
return Result.success(vehicleData);
|
||||
}
|
||||
}
|
|
@ -4,6 +4,13 @@ server:
|
|||
port: 10003
|
||||
|
||||
spring:
|
||||
profiles:
|
||||
active: dev
|
||||
cache:
|
||||
type: ehcache
|
||||
ehcache:
|
||||
config: classpath:ehcache.xml
|
||||
|
||||
rabbitmq:
|
||||
host: 47.103.75.98
|
||||
port: 5672
|
||||
|
@ -47,7 +54,7 @@ spring:
|
|||
redis:
|
||||
host: 47.103.75.98
|
||||
port: 6379
|
||||
password: hyc123
|
||||
password:
|
||||
kafka:
|
||||
producer:
|
||||
# Kafka服务器
|
||||
|
@ -130,4 +137,7 @@ mybatis:
|
|||
global-config:
|
||||
db-config:
|
||||
id-type: auto
|
||||
event:
|
||||
name: collect
|
||||
partition: 001
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd"
|
||||
updateCheck="false">
|
||||
|
||||
<defaultCache
|
||||
eternal="false"
|
||||
maxElementsInMemory="10000"
|
||||
overflowToDisk="false"
|
||||
diskPersistent="false"
|
||||
timeToLiveSeconds="3600"
|
||||
timeToIdleSeconds="0"
|
||||
diskExpiryThreadIntervalSeconds="120"
|
||||
memoryStoreEvictionPolicy="LRU"/>
|
||||
|
||||
<cache
|
||||
name="carEvent"
|
||||
eternal="false"
|
||||
maxElementsInMemory="10000"
|
||||
overflowToDisk="false"
|
||||
diskPersistent="false"
|
||||
timeToLiveSeconds="3600"
|
||||
timeToIdleSeconds="0"
|
||||
diskExpiryThreadIntervalSeconds="120"
|
||||
memoryStoreEvictionPolicy="LRU"/>
|
||||
|
||||
<!-- 存储到磁盘时的路径-->
|
||||
<diskStore path="D:\workspace\parse-message\src\main\resources\log\ehcache" />
|
||||
|
||||
</ehcache>
|
|
@ -0,0 +1,28 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!DOCTYPE mapper
|
||||
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<!--
|
||||
1.在mybats的开发中namespace有特殊的意思,一定要是对应接口的全限定名
|
||||
通过namespace可以简历mapper.xml和接口之间的关系(名字不重要,位置不重要)
|
||||
-->
|
||||
<mapper namespace="com.hyc.kafka.demo.mapper.CarEventMapper">
|
||||
|
||||
|
||||
<select id="getCarEvent" resultType="com.hyc.domain.CacheCarEvent">
|
||||
SELECT
|
||||
`id`,
|
||||
`vin`,
|
||||
`event_value`
|
||||
FROM `car_event`
|
||||
WHERE `vin` = #{vin}
|
||||
</select>
|
||||
<select id="getAllCarEvent" resultType="com.hyc.domain.CacheCarEvent">
|
||||
SELECT
|
||||
`id`,
|
||||
`vin`,
|
||||
`event_value`
|
||||
FROM `car_event`
|
||||
</select>
|
||||
|
||||
</mapper>
|
Loading…
Reference in New Issue