From 07fae80e3626cf935798f48f2a83a72e645e8179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=87=AF?= <371894675@qq.com> Date: Fri, 1 Dec 2023 22:41:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20=E4=B8=A4=E4=B8=AA?= =?UTF-8?q?=E9=98=9F=E5=88=97=E4=BD=9C=E4=B8=BA=E7=BC=93=E5=86=B2=E6=B1=A0?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E6=8C=81=E4=B9=85=E5=8C=96tidb=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rabbitmq/ListenEventChangeRabbitMq.java | 15 ++-- .../com/parseSystem/utils/DataBuffer.java | 54 -------------- .../parseSystem/utils/DataQueueManager.java | 73 +++++++++++++++++++ .../com/parseSystem/utils/QueueConfig.java | 27 +++++++ src/main/resources/application-test.yml | 14 ++++ src/main/resources/application.yml | 2 + 6 files changed, 122 insertions(+), 63 deletions(-) delete mode 100644 src/main/java/com/parseSystem/utils/DataBuffer.java create mode 100644 src/main/java/com/parseSystem/utils/DataQueueManager.java create mode 100644 src/main/java/com/parseSystem/utils/QueueConfig.java create mode 100644 src/main/resources/application-test.yml diff --git a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java index 406646b..ddceb12 100644 --- a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java +++ b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java @@ -3,8 +3,7 @@ package com.parseSystem.rabbitmq; import com.alibaba.fastjson.JSONObject; import com.parseSystem.event.EventHandlerService; import com.parseSystem.kafka.constant.kafkaConstants; -import com.parseSystem.storage.service.StorageDateService; -import com.parseSystem.utils.DataBuffer; +import com.parseSystem.utils.DataQueueManager; import com.parseSystem.utils.ParseUtil; import com.parseSystem.vehicle.VehicleData; import com.rabbitmq.client.Channel; @@ -34,12 +33,12 @@ import java.util.Collections; @Component @Log4j2 public class ListenEventChangeRabbitMq { + @Autowired - private DataBuffer dataBuffer; + private DataQueueManager dataQueueManager; + @Autowired private KafkaConsumer consumer; - @Autowired - private StorageDateService storageDateService; @Autowired private EventHandlerService eventHandlerService; @@ -67,25 +66,23 @@ public class ListenEventChangeRabbitMq { // 将消费者订阅的主题设置为"kafka_top"队列的第一个分区 consumer.assign(Collections.singleton(topicPartition)); - // 开启线程持续拉取数据 while (true) { ConsumerRecords records = null; try { // 每个一秒钟拉取一次数据 records = consumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord record : records) { System.out.println(record.value()); long startTime = System.currentTimeMillis(); log.info("开始消费时间{}:",startTime); // 解析数据 String data = ParseUtil.sixteenToStr(record.value()); - // 构建数据对象 VehicleData vehicleData = VehicleData.getBuild(data); - dataBuffer.addToBuffer(vehicleData); log.info(vehicleData); + dataQueueManager.enqueueData(vehicleData); + log.info("耗费时间{}",System.currentTimeMillis()-startTime); // 调用执行事件 eventHandlerService.executeEvent(vehicleData); diff --git a/src/main/java/com/parseSystem/utils/DataBuffer.java b/src/main/java/com/parseSystem/utils/DataBuffer.java deleted file mode 100644 index addc86e..0000000 --- a/src/main/java/com/parseSystem/utils/DataBuffer.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.parseSystem.utils; - -import com.parseSystem.storage.service.StorageDateService; -import com.parseSystem.vehicle.VehicleData; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -@Component -public class DataBuffer { - @Autowired - private StorageDateService storageDateService; - private BlockingQueue buffer; // 内存队列 - private int threshold; // 队列容量阈值 - public DataBuffer() { - buffer = new LinkedBlockingQueue<>();// 创建一个链表阻塞队列作为缓冲区 - this.threshold = 100; - } - - public void addToBuffer(VehicleData data) { - try { - buffer.put(data); // 将数据放入队列,如果队列已满则会阻塞等待 - if (buffer.size() >= threshold) { - // 队列容量达到阈值,执行数据库入库操作 - startDatabaseInsertion(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // 处理中断异常 - } - } - - public void startDatabaseInsertion() { - Thread dbThread = new Thread(() -> { - while (!Thread.currentThread().isInterrupted()) { - // 从缓冲区获取一批数据 - List batch = new ArrayList<>(); - buffer.drainTo(batch, 100); - // 执行数据库入库操作 - storageDateService.saveBatch(batch); - } - }); - dbThread.start(); - } - - public void stopDatabaseInsertion() { - // 停止数据库入库线程 - // ... - } -} diff --git a/src/main/java/com/parseSystem/utils/DataQueueManager.java b/src/main/java/com/parseSystem/utils/DataQueueManager.java new file mode 100644 index 0000000..5e25d03 --- /dev/null +++ b/src/main/java/com/parseSystem/utils/DataQueueManager.java @@ -0,0 +1,73 @@ +package com.parseSystem.utils; + +import com.parseSystem.storage.service.StorageDateService; +import com.parseSystem.vehicle.VehicleData; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/12/1 20:47 + */ +@Component +@Log4j2 +public class DataQueueManager { + + @Autowired + private StorageDateService storageDateService; + private static final int BATCH_SIZE_THRESHOLD = 100; // 批量处理阈值 + + + @Autowired + private BlockingQueue dataQueue1; + + @Autowired + private BlockingQueue dataQueue2; + + + + public void enqueueData(VehicleData vehicleData) { + // 将数据放入第一个队列中,如果已满,则放入第二个队列中 + if (!dataQueue1.offer(vehicleData)) { + log.warn("第一个队列已满,将数据放入第二个队列"); + dataQueue2.offer(vehicleData); + } + + } + + + @Scheduled(fixedRate = 5000) + public void processQueues() { + // 如果第一个队列中的数据数量达到阈值,就从第一个队列中取出数据进行批量持久化操作 + if (dataQueue1.size() >= BATCH_SIZE_THRESHOLD) { + List batchData = new ArrayList<>(); + dataQueue1.drainTo(batchData, BATCH_SIZE_THRESHOLD); + batchPersistToDatabase(batchData); + } + + // 如果第一个队列为空,但是第二个队列不为空,就从第二个队列中取出数据进行持久化操作 + else if (dataQueue1.isEmpty() && !dataQueue2.isEmpty()) { + log.warn("第一个队列为空,从第二个队列中取出数据进行持久化操作"); + List batchData = new ArrayList<>(); + dataQueue2.drainTo(batchData, BATCH_SIZE_THRESHOLD); + batchPersistToDatabase(batchData); + } + } + + private void batchPersistToDatabase(List batchData) { + // 批量持久化到数据库的逻辑 + for (VehicleData vehicleData : batchData) { + storageDateService.save(vehicleData); // 持久化到数据库 + } + } +} diff --git a/src/main/java/com/parseSystem/utils/QueueConfig.java b/src/main/java/com/parseSystem/utils/QueueConfig.java new file mode 100644 index 0000000..156c5d4 --- /dev/null +++ b/src/main/java/com/parseSystem/utils/QueueConfig.java @@ -0,0 +1,27 @@ +package com.parseSystem.utils; + +import com.parseSystem.vehicle.VehicleData; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/12/1 21:15 + */ +@Configuration +public class QueueConfig { + @Bean + public BlockingQueue dataQueue1() { + return new LinkedBlockingQueue<>(); + } + + @Bean + public BlockingQueue dataQueue2() { + return new LinkedBlockingQueue<>(); + } +} diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml new file mode 100644 index 0000000..2c65c29 --- /dev/null +++ b/src/main/resources/application-test.yml @@ -0,0 +1,14 @@ +spring: + profiles: + active: dev + redis: + host: 10.100.1.2 + port: 6379 + password: + application: + name: parseSystem + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 +server: + port: 8097 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1998732..ba27a86 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,4 +1,6 @@ spring: + profiles: + active: dev redis: host: 10.100.1.2 port: 6379