From f8c5b56b10b6ff7248c6958acf454e076df01a11 Mon Sep 17 00:00:00 2001 From: 20300 <643145201@qq.com> Date: Mon, 17 Jun 2024 11:01:02 +0800 Subject: [PATCH] =?UTF-8?q?feat()=E7=BB=B4=E6=8A=A4=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 15 +- .../java/com/hyc/ParseMessageApplication.java | 2 + src/main/java/com/hyc/config/MsgComment.java | 126 +++++++++++++++ .../com/hyc/consumer/EventUpdateConsumer.java | 19 +++ .../java/com/hyc/domain/CacheCarEvent.java | 38 +++++ src/main/java/com/hyc/domain/NodeName.java | 23 +++ src/main/java/com/hyc/domain/VehicleData.java | 68 ++++++++ .../hyc/domain/constant/EventConstant.java | 16 ++ .../com/hyc/domain/req/CarEventUpdate.java | 32 ++++ .../kafka/demo/consumer/KafkaConsumer.java | 88 ++++++++++- .../kafka/demo/consumer/KafkaConsumer1.java | 147 +++++++++++++++++- .../hyc/kafka/demo/mapper/CarEventMapper.java | 21 +++ .../kafka/demo/service/CarEventService.java | 25 +++ .../service/impl/CarEventServiceImpl.java | 37 +++++ .../hyc/producer/TransmitMessageProducer.java | 12 -- .../java/com/hyc/runner/SyncCacheRunner.java | 45 ++++++ .../simulate/constant/ExchangeConstant.java | 33 ++++ .../simulate/kafkatest/RenewalEventTest.java | 47 ++++++ .../simulate/redistest/RedisCreateKey.java | 42 +++++ src/main/resources/application.yml | 12 +- src/main/resources/ehcache.xml | 29 ++++ src/main/resources/mapper/CarEventMapper.xml | 28 ++++ 22 files changed, 883 insertions(+), 22 deletions(-) create mode 100644 src/main/java/com/hyc/config/MsgComment.java create mode 100644 src/main/java/com/hyc/consumer/EventUpdateConsumer.java create mode 100644 src/main/java/com/hyc/domain/CacheCarEvent.java create mode 100644 src/main/java/com/hyc/domain/NodeName.java create mode 100644 src/main/java/com/hyc/domain/VehicleData.java create mode 100644 src/main/java/com/hyc/domain/constant/EventConstant.java create mode 100644 src/main/java/com/hyc/domain/req/CarEventUpdate.java create mode 100644 src/main/java/com/hyc/kafka/demo/mapper/CarEventMapper.java create mode 100644 src/main/java/com/hyc/kafka/demo/service/CarEventService.java create mode 100644 src/main/java/com/hyc/kafka/demo/service/impl/CarEventServiceImpl.java delete mode 100644 src/main/java/com/hyc/producer/TransmitMessageProducer.java create mode 100644 src/main/java/com/hyc/runner/SyncCacheRunner.java create mode 100644 src/main/java/com/hyc/simulate/constant/ExchangeConstant.java create mode 100644 src/main/java/com/hyc/simulate/kafkatest/RenewalEventTest.java create mode 100644 src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java create mode 100644 src/main/resources/ehcache.xml create mode 100644 src/main/resources/mapper/CarEventMapper.xml diff --git a/pom.xml b/pom.xml index 31d957b..5f280e8 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,20 @@ 2.6.13 - + + + + net.sf.ehcache + ehcache + 2.10.9.2 + + + org.springframework.boot + spring-boot-starter-cache + 2.6.13 + + + org.apache.iotdb iotdb-session diff --git a/src/main/java/com/hyc/ParseMessageApplication.java b/src/main/java/com/hyc/ParseMessageApplication.java index f33c70d..1dea261 100644 --- a/src/main/java/com/hyc/ParseMessageApplication.java +++ b/src/main/java/com/hyc/ParseMessageApplication.java @@ -2,7 +2,9 @@ package com.hyc; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cache.annotation.EnableCaching; +@EnableCaching @SpringBootApplication public class ParseMessageApplication { diff --git a/src/main/java/com/hyc/config/MsgComment.java b/src/main/java/com/hyc/config/MsgComment.java new file mode 100644 index 0000000..ec4def4 --- /dev/null +++ b/src/main/java/com/hyc/config/MsgComment.java @@ -0,0 +1,126 @@ +package com.hyc.config; + +import com.alibaba.fastjson.JSONObject; +import com.google.gson.JsonObject; +import com.hyc.domain.CacheCarEvent; +import com.hyc.domain.NodeName; +import com.hyc.domain.req.CarEventUpdate; +import com.hyc.simulate.constant.ExchangeConstant; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * rabbit更新缓存 + * + * @author YouChe·He + * @ClassName: RabbitConfig + * @Description: rabbit配置 + * @CreateTime: 2024/6/16 10:32 + */ +@Configuration +@Slf4j +public class MsgComment { + + @Autowired + private NodeName nodeName; + @Autowired + private CacheManager cacheManager; + @Bean + public FanoutExchange eventUpdateExchange() { + return new FanoutExchange(ExchangeConstant.UPDATE_CONSTANT); + } + + @Bean + public Queue queue() { + return new Queue(nodeName.getNodeName()); + } + + @Bean + public Binding binding() { + return new Binding(nodeName.getNodeName(), + Binding.DestinationType.QUEUE, ExchangeConstant.UPDATE_CONSTANT, + "", null); + } + + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setHost("47.103.75.98"); + connectionFactory.setPort(5672); + connectionFactory.setPassword("guest"); + connectionFactory.setUsername("guest"); + //开启发送端确认 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//SIMPLE=简单的;CORRELATED=相关的;NONE=关闭b; + //开启消息返回 + connectionFactory.setPublisherReturns(true); + connectionFactory.createConnection(); + return connectionFactory; + } + + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + + rabbitTemplate.setMandatory(true); + //消息返回 + rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { + log.info("消息返回实现,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}", message, i, s, s1, s2); + }); + //发送端确认 + rabbitTemplate.setConfirmCallback((correlationData, b, s) -> { + log.info("发送端确认,correlationData:{},ack:{},cause:{}", correlationData, b, s); + + }); + return rabbitTemplate; + } + + @Bean + public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { + SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); + //针对哪些队列(参数为可变参数) + simpleMessageListenerContainer.setQueueNames(nodeName.getNodeName()); + //同时有多少个消费者线程在消费这个队列,相当于线程池的线程数字。 + simpleMessageListenerContainer.setConcurrentConsumers(3); + //最大的消费者线程数 + simpleMessageListenerContainer.setMaxConcurrentConsumers(5); + //设置消息确认方式 NONE=不确认,MANUAL=手动确认,AUTO=自动确认; + //自动确认 + //simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); + //simpleMessageListenerContainer.setMessageListener(message -> log.info("springboot.rabbitmq-queue接收到的消息:[{}]", message)); + //手动确认(单条确认) + simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); + simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { + log.info("队列名:{}",nodeName.getNodeName()); + log.info("接收到的消息:[{}]",message.toString()); + String eventString = new String(message.getBody()); + CarEventUpdate carEventUpdate = JSONObject.parseObject(eventString, CarEventUpdate.class); + + Cache carEvent = cacheManager.getCache("carEvent"); + carEvent.put(carEventUpdate.getVin(),carEventUpdate.getHandleEvent()); + Cache.ValueWrapper valueWrapper = carEvent.get(carEventUpdate.getVin()); + log.info("小车{}更新后的缓存中事件是:{}",carEventUpdate.getVin(),valueWrapper.get().toString()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + }); + //消费端限流 + simpleMessageListenerContainer.setPrefetchCount(1); + return simpleMessageListenerContainer; + } +} diff --git a/src/main/java/com/hyc/consumer/EventUpdateConsumer.java b/src/main/java/com/hyc/consumer/EventUpdateConsumer.java new file mode 100644 index 0000000..c65871e --- /dev/null +++ b/src/main/java/com/hyc/consumer/EventUpdateConsumer.java @@ -0,0 +1,19 @@ +package com.hyc.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 事件更新消费者 + * + * @author YouChe·He + * @ClassName: EventUpdateConsumer + * @Description: 事件更新消费者 + * @CreateTime: 2024/6/16 10:28 + */ +@Component +@Slf4j +public class EventUpdateConsumer { + + +} diff --git a/src/main/java/com/hyc/domain/CacheCarEvent.java b/src/main/java/com/hyc/domain/CacheCarEvent.java new file mode 100644 index 0000000..8da6f9a --- /dev/null +++ b/src/main/java/com/hyc/domain/CacheCarEvent.java @@ -0,0 +1,38 @@ +package com.hyc.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.apache.kafka.common.protocol.types.Field; + +/** + * 缓存小车事件类 + * + * @author YouChe·He + * @ClassName: CacheCarEvent + * @Description: 缓存小车事件类 + * @CreateTime: 2024/6/14 20:29 + */ +@Data +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class CacheCarEvent { + /** + * 对应id + */ + private Integer id; + /** + * 小车VIN + */ + private String vin; + /** + * 绑定事件 + * 1:电子围栏事件 + * 2:实时数据事件 + * 3:故障预警事件 + */ + private String eventValue; + +} diff --git a/src/main/java/com/hyc/domain/NodeName.java b/src/main/java/com/hyc/domain/NodeName.java new file mode 100644 index 0000000..cfbdabb --- /dev/null +++ b/src/main/java/com/hyc/domain/NodeName.java @@ -0,0 +1,23 @@ +package com.hyc.domain; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +/** + * 节点名称 + * + * @author YouChe·He + * @ClassName: NodeName + * @Description: 节点名称 + * @CreateTime: 2024/6/16 10:03 + */ +@Configuration +public class NodeName { + @Value("${event.name}") + private String name; + @Value("${event.partition}") + private String partition; + public String getNodeName(){ + return name+"_"+partition; + } +} diff --git a/src/main/java/com/hyc/domain/VehicleData.java b/src/main/java/com/hyc/domain/VehicleData.java new file mode 100644 index 0000000..5f9e48c --- /dev/null +++ b/src/main/java/com/hyc/domain/VehicleData.java @@ -0,0 +1,68 @@ +package com.hyc.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * 小车数据类 + * + * @author YouChe·He + * @ClassName: VehicleData + * @Description: 小车数据类 + * @CreateTime: 2024/6/14 16:19 + */ +@Data +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class VehicleData { + private String vin; + private Long startTime; + private Double longitude; + private Double latitude; + private Double speed; + private Double mileage; + private Double voltage; + private Double current; + private Double resistance; + private String gear; + private Double accelerationPedal; + private Double brakePedal; + private Double fuelConsumptionRate; + private Double motorControllerTemperature; + private Double motorSpeed; + private Double motorTorque; + private Double motorTemperature; + private Double motorVoltage; + private Double motorCurrent; + private Double remainingBattery; + private Double maximumFeedbackPower; + private Double maximumDischargePower; + private Double selfCheckCounter; + private Double totalBatteryCurrent; + private Double totalBatteryVoltage; + private Double singleBatteryMaxVoltage; + private Double singleBatteryMinVoltage; + private Double singleBatteryMaxTemperature; + private Double singleBatteryMinTemperature; + private Double availableBatteryCapacity; + private Double vehicleStatus; + private Double chargingStatus; + private Double operatingStatus; + private Double socStatus; + private Double chargingEnergyStorageStatus; + private Double driveMotorStatus; + private Double positionStatus; + private Double easStatus; + private Double ptcStatus; + private Double epsStatus; + private Double absStatus; + private Double mcuStatus; + private Double heatingStatus; + private Double batteryStatus; + private Double batteryInsulationStatus; + private Double dcdcStatus; + private Double chgStatus; +} diff --git a/src/main/java/com/hyc/domain/constant/EventConstant.java b/src/main/java/com/hyc/domain/constant/EventConstant.java new file mode 100644 index 0000000..5fb61e2 --- /dev/null +++ b/src/main/java/com/hyc/domain/constant/EventConstant.java @@ -0,0 +1,16 @@ +package com.hyc.domain.constant; + +import lombok.Data; + +/** + * 事件常量 + * + * @author YouChe·He + * @ClassName: EventConstant + * @Description: 事件常量 + * @CreateTime: 2024/6/16 15:39 + */ +@Data +public class EventConstant { + public final static String REAL_TIME_DATA = "_REAL_TIME_DATA"; +} diff --git a/src/main/java/com/hyc/domain/req/CarEventUpdate.java b/src/main/java/com/hyc/domain/req/CarEventUpdate.java new file mode 100644 index 0000000..2af03f4 --- /dev/null +++ b/src/main/java/com/hyc/domain/req/CarEventUpdate.java @@ -0,0 +1,32 @@ +package com.hyc.domain.req; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import java.io.Serializable; + +/** + * 小车绑定事件更新类 + * + * @author YouChe·He + * @ClassName: CarEventUpdate + * @Description: 小车绑定事件更新类 + * @CreateTime: 2024/6/15 18:46 + */ +@Data +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class CarEventUpdate implements Serializable{ + /** + * 修改小车VIN + */ + private String vin; + /** + * 动作类型 + */ + private String handleEvent; + +} diff --git a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java index 781701a..24a384b 100644 --- a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java +++ b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer.java @@ -1,16 +1,21 @@ package com.hyc.kafka.demo.consumer; -import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.hyc.domain.CacheCarEvent; import com.hyc.domain.VehicleData; +import com.hyc.domain.constant.EventConstant; import com.hyc.iotdbdemo.config.IotDBSessionConfig; import com.hyc.iotdbdemo.server.IotDbServer; import com.hyc.kafka.demo.strategy.Strategy; +import com.hyc.kafka.demo.service.CarEventService; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.Cacheable; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; @@ -22,6 +27,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeUnit; /** * kafka消费者 @@ -34,6 +40,11 @@ import java.util.Random; @Slf4j @Service public class KafkaConsumer { + @Autowired + private CacheManager cacheManager; + + @Autowired + private CarEventService carEventService; @Autowired private RedisTemplate redisTemplate; @@ -46,9 +57,9 @@ public class KafkaConsumer { public void consume(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { try { //策略map集合 - LinkedHashMap stringStrategyLinkedHashMap = new LinkedHashMap<>(); - stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA); - stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA); +// LinkedHashMap stringStrategyLinkedHashMap = new LinkedHashMap<>(); +// stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA); +// stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA); //解析得到VIN String value = (String) consumerRecord.value(); @@ -57,6 +68,31 @@ public class KafkaConsumer { log.error("消费者0得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition()); iotDbServer.insertData(vehicleData); + //得到小车绑定信息 + String carEventByVin = this.getCarEventByVin(vehicleData.getVin()); + //解析绑定的信息 + String[] eventArr = carEventByVin.split(","); + for (String eventNumber : eventArr) { + //电子围栏事件 + if (eventNumber.equals("1")){ + //从缓存中查询车辆绑定的围栏组,与车辆信息进行比较 + } + //实时数据事件 + if (eventNumber.equals("2")) { + if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){ + //有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中 + String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA; + String carData = JSONObject.toJSONString(vehicleData); + this.updateValueWithoutChangingTTL(key, value); + } + } + //故障预警事件 + if (eventNumber.equals("3")) { + + } + } + + // String dataMessage = value.toString(); // JSONObject jsonObject = JSON.parseObject(dataMessage); // //根据VIN得到该小车拥有的事件 @@ -89,4 +125,48 @@ public class KafkaConsumer { } return strings; } + + public String getCarEventByVin(String vin){ + + + // 先从本地缓存获取 + Cache cache = cacheManager.getCache("carEvent"); + Cache.ValueWrapper valueWrapper = cache.get(vin); + if (valueWrapper != null) { + return valueWrapper.get().toString(); + } + + // 从 Redis 获取 + + String carEventString = redisTemplate.opsForValue().get(vin); + if (carEventString != null) { + // 将数据同步到本地缓存 + cache.put(vin, carEventString); + return carEventString; + } + + // 从 MySQL 获取 + CacheCarEvent cacheCarEvent = carEventService.getCarEvent(vin); + if (cacheCarEvent != null) { + // 将数据存入 Redis 和本地缓存 + redisTemplate.opsForValue().set(vin, cacheCarEvent.getEventValue()); + cache.put(vin, cacheCarEvent.getEventValue()); + } + + return cacheCarEvent.getEventValue(); + + } + + public void updateValueWithoutChangingTTL(String key, String newValue) { + // 获取当前的过期时间 + Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS); + + // 更新键的值 + redisTemplate.opsForValue().set(key, newValue); + + // 重新设置过期时间 + if (currentTTL != null && currentTTL > 0) { + redisTemplate.expire(key, currentTTL, TimeUnit.SECONDS); + } + } } diff --git a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java index 9775cfc..603eb95 100644 --- a/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java +++ b/src/main/java/com/hyc/kafka/demo/consumer/KafkaConsumer1.java @@ -1,11 +1,31 @@ package com.hyc.kafka.demo.consumer; +import com.alibaba.fastjson.JSONObject; +import com.hyc.domain.CacheCarEvent; +import com.hyc.domain.VehicleData; +import com.hyc.domain.constant.EventConstant; +import com.hyc.iotdbdemo.config.IotDBSessionConfig; +import com.hyc.iotdbdemo.server.IotDbServer; +import com.hyc.kafka.demo.service.CarEventService; import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import java.rmi.ServerException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + /** * kafka消费者 * @@ -17,14 +37,133 @@ import org.springframework.stereotype.Service; @Slf4j @Service public class KafkaConsumer1 { + @Autowired + private CacheManager cacheManager; + + @Autowired + private CarEventService carEventService; + + @Autowired + private RedisTemplate redisTemplate; + @Resource + private IotDbServer iotDbServer; + @Resource + private IotDBSessionConfig iotDBSessionConfig; @KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory", - errorHandler = "myKafkaListenerErrorHandler") + errorHandler = "myKafkaListenerErrorHandler") public void consume(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { try { - Object value = consumerRecord.value(); - log.error("消费者1得到的数据:{},所在分区:{}",value,consumerRecord.partition()); - }finally { + //策略map集合 +// LinkedHashMap stringStrategyLinkedHashMap = new LinkedHashMap<>(); +// stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA); +// stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA); + + //解析得到VIN + String value = (String) consumerRecord.value(); + + VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class); + log.error("消费者1得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition()); + + iotDbServer.insertData(vehicleData); + //得到小车绑定信息 + String carEventByVin = this.getCarEventByVin(vehicleData.getVin()); + //解析绑定的信息 + String[] eventArr = carEventByVin.split(","); + for (String eventNumber : eventArr) { + //电子围栏事件 + if (eventNumber.equals("1")){ + //从缓存中查询车辆绑定的围栏组,与车辆信息进行比较 + } + //实时数据事件 + if (eventNumber.equals("2")) { + if (redisTemplate.hasKey(vehicleData.getVin()+ EventConstant.REAL_TIME_DATA)){ + //有VIN_REAL_TIME_DATA这个key,则代表用户有查询对应车辆VIN数据的请求,将数据存储到redis中 + String key = vehicleData.getVin() + EventConstant.REAL_TIME_DATA; + String carData = JSONObject.toJSONString(vehicleData); + this.updateValueWithoutChangingTTL(key, value); + } + } + //故障预警事件 + if (eventNumber.equals("3")) { + + } + } + + +// String dataMessage = value.toString(); +// JSONObject jsonObject = JSON.parseObject(dataMessage); +// //根据VIN得到该小车拥有的事件 +// String vin = jsonObject.get("vin").toString(); +// List eventList = getEvent(vin); +// //循环事件集合,并执行响应的事件 +// for (String event : eventList) { +// stringStrategyLinkedHashMap.get(event).exe(redisTemplate,dataMessage); +// } + + } catch (ServerException e) { + log.error("添加iotDb异常"); + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } finally { acknowledgment.acknowledge(); } } + + public List getEvent(String vin){ + ArrayList strings = new ArrayList<>(); + + strings.add("存储数据"); + int nextInt = new Random().nextInt(100); + if (nextInt % 2 ==0){ + strings.add("实时数据"); + } + return strings; + } + + public String getCarEventByVin(String vin){ + + + // 先从本地缓存获取 + Cache cache = cacheManager.getCache("carEvent"); + Cache.ValueWrapper valueWrapper = cache.get(vin); + if (valueWrapper != null) { + return valueWrapper.get().toString(); + } + + // 从 Redis 获取 + + String carEventString = redisTemplate.opsForValue().get(vin); + if (carEventString != null) { + // 将数据同步到本地缓存 + cache.put(vin, carEventString); + return carEventString; + } + + // 从 MySQL 获取 + CacheCarEvent cacheCarEvent = carEventService.getCarEvent(vin); + if (cacheCarEvent != null) { + // 将数据存入 Redis 和本地缓存 + redisTemplate.opsForValue().set(vin, cacheCarEvent.getEventValue()); + cache.put(vin, cacheCarEvent.getEventValue()); + } + + return cacheCarEvent.getEventValue(); + + } + + public void updateValueWithoutChangingTTL(String key, String newValue) { + // 获取当前的过期时间 + Long currentTTL = redisTemplate.getExpire(key, TimeUnit.SECONDS); + + // 更新键的值 + redisTemplate.opsForValue().set(key, newValue); + + // 重新设置过期时间 + if (currentTTL != null && currentTTL > 0) { + redisTemplate.expire(key, currentTTL, TimeUnit.SECONDS); + } + } } diff --git a/src/main/java/com/hyc/kafka/demo/mapper/CarEventMapper.java b/src/main/java/com/hyc/kafka/demo/mapper/CarEventMapper.java new file mode 100644 index 0000000..17984b3 --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/mapper/CarEventMapper.java @@ -0,0 +1,21 @@ +package com.hyc.kafka.demo.mapper; + +import com.hyc.domain.CacheCarEvent; +import org.apache.ibatis.annotations.Mapper; + +import java.util.List; + +/** + * 小车事件持久层 + * + * @author YouChe·He + * @ClassName: CarEventMapper + * @Description: 小车事件持久层 + * @CreateTime: 2024/6/14 20:44 + */ +@Mapper +public interface CarEventMapper { + CacheCarEvent getCarEvent(String vin); + + List getAllCarEvent(); +} diff --git a/src/main/java/com/hyc/kafka/demo/service/CarEventService.java b/src/main/java/com/hyc/kafka/demo/service/CarEventService.java new file mode 100644 index 0000000..90f26ce --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/service/CarEventService.java @@ -0,0 +1,25 @@ +package com.hyc.kafka.demo.service; +/** + * 小车绑定事件处理 + * + * @author YouChe·He + * @ClassName: CarEventService + * @Description: 小车绑定事件处理 + * @CreateTime: 2024/6/14 20:38 + */ + +import com.hyc.domain.CacheCarEvent; + +import java.util.List; + +/** + *@ClassName CarEventService + *@Description 描述 + *@Author ZHIHAO.DAI + *@Date 2024/6/14 20:38 + */ +public interface CarEventService { + CacheCarEvent getCarEvent(String substring); + + List getAllCarEvent(); +} diff --git a/src/main/java/com/hyc/kafka/demo/service/impl/CarEventServiceImpl.java b/src/main/java/com/hyc/kafka/demo/service/impl/CarEventServiceImpl.java new file mode 100644 index 0000000..96b1fcd --- /dev/null +++ b/src/main/java/com/hyc/kafka/demo/service/impl/CarEventServiceImpl.java @@ -0,0 +1,37 @@ +package com.hyc.kafka.demo.service.impl; + +import com.hyc.domain.CacheCarEvent; +import com.hyc.kafka.demo.mapper.CarEventMapper; +import com.hyc.kafka.demo.service.CarEventService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * 小车绑定服务实现类 + * + * @author YouChe·He + * @ClassName: CarEventServiceImpl + * @Description: 小车绑定服务实现类 + * @CreateTime: 2024/6/14 20:38 + */ +@Service +@Slf4j +public class CarEventServiceImpl implements CarEventService { + @Autowired + private CarEventMapper carEventMapper; + + @Override + public CacheCarEvent getCarEvent(String vin) { + log.info("小车的VIN是:{}",vin); + CacheCarEvent cacheCarEvent = carEventMapper.getCarEvent(vin); + return cacheCarEvent; + } + + @Override + public List getAllCarEvent() { + return carEventMapper.getAllCarEvent(); + } +} diff --git a/src/main/java/com/hyc/producer/TransmitMessageProducer.java b/src/main/java/com/hyc/producer/TransmitMessageProducer.java deleted file mode 100644 index 97cf404..0000000 --- a/src/main/java/com/hyc/producer/TransmitMessageProducer.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.hyc.producer; - -/** - * 传递报文生产者 - * - * @author YouChe·He - * @ClassName: TransmitMessageProducer - * @Description: 传递报文生产者 - * @CreateTime: 2024/6/5 14:35 - */ -public class TransmitMessageProducer { -} diff --git a/src/main/java/com/hyc/runner/SyncCacheRunner.java b/src/main/java/com/hyc/runner/SyncCacheRunner.java new file mode 100644 index 0000000..ea79a03 --- /dev/null +++ b/src/main/java/com/hyc/runner/SyncCacheRunner.java @@ -0,0 +1,45 @@ +package com.hyc.runner; + +import com.alibaba.fastjson.JSONObject; +import com.hyc.domain.CacheCarEvent; +import com.hyc.domain.SummarizeResp; +import com.hyc.kafka.demo.service.CarEventService; +import com.hyc.service.SummarizeService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 同步缓存Runner + * + * @author YouChe·He + * @ClassName: SyncCacheRunner + * @Description: 同步缓存Runner + * @CreateTime: 2024/6/16 11:40 + */ +@Component +public class SyncCacheRunner implements ApplicationRunner { + @Autowired + private CarEventService carEventService; + @Autowired + private CacheManager cacheManager; + @Autowired + private RedisTemplate redisTemplate; + @Override + public void run(ApplicationArguments args) throws Exception { + Cache cache = cacheManager.getCache("carEvent"); + List cacheCarEventList = carEventService.getAllCarEvent(); + for (CacheCarEvent cacheCarEvent : cacheCarEventList) { + redisTemplate.opsForValue().set(cacheCarEvent.getVin(), JSONObject.toJSONString(cacheCarEvent)); + cache.put(cacheCarEvent.getVin(), cacheCarEvent.getEventValue()); + } + // 将数据存入 Redis 和本地缓存 + + } +} diff --git a/src/main/java/com/hyc/simulate/constant/ExchangeConstant.java b/src/main/java/com/hyc/simulate/constant/ExchangeConstant.java new file mode 100644 index 0000000..bccd7e3 --- /dev/null +++ b/src/main/java/com/hyc/simulate/constant/ExchangeConstant.java @@ -0,0 +1,33 @@ +package com.hyc.simulate.constant; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 交换机常量 + * + * @author YouChe·He + * @ClassName: ExchangeConstant + * @Description: 交换机常量 + * @CreateTime: 2024/6/15 19:01 + */ +@Configuration +public class ExchangeConstant { + + public final static String UPDATE_CONSTANT = "update_constant"; + + @Bean + public FanoutExchange handleEvent(){ + return new FanoutExchange(UPDATE_CONSTANT); + } + +} diff --git a/src/main/java/com/hyc/simulate/kafkatest/RenewalEventTest.java b/src/main/java/com/hyc/simulate/kafkatest/RenewalEventTest.java new file mode 100644 index 0000000..b399e4b --- /dev/null +++ b/src/main/java/com/hyc/simulate/kafkatest/RenewalEventTest.java @@ -0,0 +1,47 @@ +package com.hyc.simulate.kafkatest; + +import com.alibaba.fastjson.JSONObject; +import com.hyc.domain.CacheCarEvent; +import com.hyc.domain.req.CarEventUpdate; +import com.hyc.simulate.constant.ExchangeConstant; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 更新事件测试类 + * + * @author YouChe·He + * @ClassName: RenewalEventTest + * @Description: 更新事件测试类 + * @CreateTime: 2024/6/15 18:40 + */ +@Slf4j +@RestController +@RequestMapping("test") +public class RenewalEventTest { + @Autowired + private RabbitTemplate rabbitTemplate; + @Autowired + private CacheManager cacheManager; + + @PostMapping("/broadcast") + public String broadcastMessage(@RequestBody CarEventUpdate carEventUpdate){ + + Cache cache = cacheManager.getCache("carEvent"); + Cache.ValueWrapper valueWrapper = cache.get(carEventUpdate.getVin()); + log.info("小车:{}更新前的缓存中事件值是:{}",carEventUpdate.getVin(),valueWrapper.get().toString()); + if (valueWrapper != null) { + + } + rabbitTemplate.convertAndSend(ExchangeConstant.UPDATE_CONSTANT,"", JSONObject.toJSONString(carEventUpdate)); + return "sent success!"; + } + +} diff --git a/src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java b/src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java new file mode 100644 index 0000000..d9cb75b --- /dev/null +++ b/src/main/java/com/hyc/simulate/redistest/RedisCreateKey.java @@ -0,0 +1,42 @@ +package com.hyc.simulate.redistest; + +import com.alibaba.fastjson.JSONObject; +import com.hyc.domain.VehicleData; +import com.hyc.domain.constant.EventConstant; +import com.hyc.result.Result; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.TimeUnit; + +/** + * 在redsi中创建事件相关key + * + * @author YouChe·He + * @ClassName: RedisCreateKey + * @Description: 在redsi中创建事件相关key + * @CreateTime: 2024/6/16 15:35 + */ +@RestController +@RequestMapping("/test/redis") +public class RedisCreateKey { + + @Autowired + private RedisTemplate redisTemplate; + + @RequestMapping("/createRealTimeDataKey") + public Result createRealTimeDataKey(String vin) { + redisTemplate.opsForValue().set(vin + EventConstant.REAL_TIME_DATA,"",3, TimeUnit.MINUTES); + return Result.success("","创建成功"); + } + + @RequestMapping("/getDataByRedis") + public Result getDataByRedis(String vin){ + String vehicleJson = redisTemplate.opsForValue().get(vin + EventConstant.REAL_TIME_DATA); + VehicleData vehicleData = JSONObject.parseObject(vehicleJson, VehicleData.class); + redisTemplate.opsForValue().set(vin+EventConstant.REAL_TIME_DATA,vehicleJson,3,TimeUnit.MINUTES); + return Result.success(vehicleData); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 48a66a3..46d00a5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,13 @@ server: port: 10003 spring: + profiles: + active: dev + cache: + type: ehcache + ehcache: + config: classpath:ehcache.xml + rabbitmq: host: 47.103.75.98 port: 5672 @@ -47,7 +54,7 @@ spring: redis: host: 47.103.75.98 port: 6379 - password: hyc123 + password: kafka: producer: # Kafka服务器 @@ -130,4 +137,7 @@ mybatis: global-config: db-config: id-type: auto +event: + name: collect + partition: 001 diff --git a/src/main/resources/ehcache.xml b/src/main/resources/ehcache.xml new file mode 100644 index 0000000..dfccbc2 --- /dev/null +++ b/src/main/resources/ehcache.xml @@ -0,0 +1,29 @@ + + + + + + + + + + diff --git a/src/main/resources/mapper/CarEventMapper.xml b/src/main/resources/mapper/CarEventMapper.xml new file mode 100644 index 0000000..b947dc4 --- /dev/null +++ b/src/main/resources/mapper/CarEventMapper.xml @@ -0,0 +1,28 @@ + + + + + + + + + +