diff --git a/src/main/java/com/parseSystem/ParseSystemApplication.java b/src/main/java/com/parseSystem/ParseSystemApplication.java index 864408c..9a7e942 100644 --- a/src/main/java/com/parseSystem/ParseSystemApplication.java +++ b/src/main/java/com/parseSystem/ParseSystemApplication.java @@ -2,6 +2,7 @@ package com.parseSystem; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; /** * @author 冯凯 @@ -10,6 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @date 2023/11/25 15:34 */ @SpringBootApplication +@EnableScheduling public class ParseSystemApplication { /** diff --git a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java index ddceb12..45eee9b 100644 --- a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java +++ b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java @@ -23,11 +23,10 @@ import java.time.Duration; import java.util.Collections; /** - * * 监听车辆事件变更消费队列 + * * @author 冯凯 * @version 1.0 - * * @date 2023/11/27 22:19 */ @Component @@ -52,7 +51,7 @@ public class ListenEventChangeRabbitMq { */ @RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")}) public void consumerSubscribe(String mesg, Message message, Channel channel) { - log.info("收到准备订阅主题:{}" , mesg); + log.info("收到准备订阅主题:{}", mesg); // 将接收到的消息解析为kafkaConstants对象 kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class); @@ -67,31 +66,30 @@ public class ListenEventChangeRabbitMq { // 将消费者订阅的主题设置为"kafka_top"队列的第一个分区 consumer.assign(Collections.singleton(topicPartition)); // 开启线程持续拉取数据 - while (true) { - ConsumerRecords records = null; - try { - // 每个一秒钟拉取一次数据 - records = consumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord record : records) { - System.out.println(record.value()); - long startTime = System.currentTimeMillis(); - log.info("开始消费时间{}:",startTime); - // 解析数据 - String data = ParseUtil.sixteenToStr(record.value()); - // 构建数据对象 - VehicleData vehicleData = VehicleData.getBuild(data); - log.info(vehicleData); - dataQueueManager.enqueueData(vehicleData); - - log.info("耗费时间{}",System.currentTimeMillis()-startTime); - // 调用执行事件 - eventHandlerService.executeEvent(vehicleData); - } - } catch (Exception e) { - log.info("records: {}", records); - log.error(e); + while (true) { + ConsumerRecords records = null; + try { + // 每个一秒钟拉取一次数据 + records = consumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord record : records) { + System.out.println(record.value()); + long startTime = System.currentTimeMillis(); + log.info("开始消费时间{}:", startTime); + // 解析数据 + String data = ParseUtil.sixteenToStr(record.value()); + // 构建数据对象 + VehicleData vehicleData = VehicleData.getBuild(data); + log.info(vehicleData); + dataQueueManager.enqueueData(vehicleData); + // 调用执行事件 + eventHandlerService.executeEvent(vehicleData); + log.info("耗费时间{}", System.currentTimeMillis() - startTime); } + } catch (Exception e) { + log.info("records: {}", records); + log.error(e); } + } // 订阅特定分区 diff --git a/src/main/java/com/parseSystem/utils/DataQueueManager.java b/src/main/java/com/parseSystem/utils/DataQueueManager.java index 5e25d03..b1988d7 100644 --- a/src/main/java/com/parseSystem/utils/DataQueueManager.java +++ b/src/main/java/com/parseSystem/utils/DataQueueManager.java @@ -4,14 +4,12 @@ import com.parseSystem.storage.service.StorageDateService; import com.parseSystem.vehicle.VehicleData; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; /** * @author 冯凯 @@ -23,9 +21,9 @@ import java.util.concurrent.LinkedBlockingQueue; @Log4j2 public class DataQueueManager { + private int count; @Autowired private StorageDateService storageDateService; - private static final int BATCH_SIZE_THRESHOLD = 100; // 批量处理阈值 @Autowired @@ -49,25 +47,22 @@ public class DataQueueManager { @Scheduled(fixedRate = 5000) public void processQueues() { // 如果第一个队列中的数据数量达到阈值,就从第一个队列中取出数据进行批量持久化操作 - if (dataQueue1.size() >= BATCH_SIZE_THRESHOLD) { - List batchData = new ArrayList<>(); - dataQueue1.drainTo(batchData, BATCH_SIZE_THRESHOLD); - batchPersistToDatabase(batchData); - } + if (!dataQueue1.isEmpty()) { + List batchData = new ArrayList<>(); + count+=batchData.size(); + dataQueue1.drainTo(batchData); + storageDateService.saveBatch(batchData); + } // 如果第一个队列为空,但是第二个队列不为空,就从第二个队列中取出数据进行持久化操作 else if (dataQueue1.isEmpty() && !dataQueue2.isEmpty()) { log.warn("第一个队列为空,从第二个队列中取出数据进行持久化操作"); List batchData = new ArrayList<>(); - dataQueue2.drainTo(batchData, BATCH_SIZE_THRESHOLD); - batchPersistToDatabase(batchData); + count+=batchData.size(); + dataQueue2.drainTo(batchData); + storageDateService.saveBatch(batchData); } + } - private void batchPersistToDatabase(List batchData) { - // 批量持久化到数据库的逻辑 - for (VehicleData vehicleData : batchData) { - storageDateService.save(vehicleData); // 持久化到数据库 - } - } } diff --git a/src/main/java/com/parseSystem/vehicle/VehicleData.java b/src/main/java/com/parseSystem/vehicle/VehicleData.java index 674d31a..9a793c7 100644 --- a/src/main/java/com/parseSystem/vehicle/VehicleData.java +++ b/src/main/java/com/parseSystem/vehicle/VehicleData.java @@ -14,7 +14,7 @@ import java.util.Date; @ToString @NoArgsConstructor @AllArgsConstructor -@TableName("vehicle_data") +@TableName("vehicle_data_copy1") public class VehicleData { /** diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ba27a86..34f7ba6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -9,7 +9,7 @@ spring: name: parseSystem datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 + url: jdbc:mysql://117.72.43.22:4000/test1?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 username: root password: 123456 rabbitmq: