增加 两个队列作为缓冲池进行持久化tidb数据库

master^2
冯凯 2023-12-01 22:41:05 +08:00
parent 1deb1d8cd7
commit 07fae80e36
6 changed files with 122 additions and 63 deletions

View File

@ -3,8 +3,7 @@ package com.parseSystem.rabbitmq;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.parseSystem.event.EventHandlerService; import com.parseSystem.event.EventHandlerService;
import com.parseSystem.kafka.constant.kafkaConstants; import com.parseSystem.kafka.constant.kafkaConstants;
import com.parseSystem.storage.service.StorageDateService; import com.parseSystem.utils.DataQueueManager;
import com.parseSystem.utils.DataBuffer;
import com.parseSystem.utils.ParseUtil; import com.parseSystem.utils.ParseUtil;
import com.parseSystem.vehicle.VehicleData; import com.parseSystem.vehicle.VehicleData;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
@ -34,12 +33,12 @@ import java.util.Collections;
@Component @Component
@Log4j2 @Log4j2
public class ListenEventChangeRabbitMq { public class ListenEventChangeRabbitMq {
@Autowired @Autowired
private DataBuffer dataBuffer; private DataQueueManager dataQueueManager;
@Autowired @Autowired
private KafkaConsumer<String, String> consumer; private KafkaConsumer<String, String> consumer;
@Autowired
private StorageDateService storageDateService;
@Autowired @Autowired
private EventHandlerService eventHandlerService; private EventHandlerService eventHandlerService;
@ -67,25 +66,23 @@ public class ListenEventChangeRabbitMq {
// 将消费者订阅的主题设置为"kafka_top"队列的第一个分区 // 将消费者订阅的主题设置为"kafka_top"队列的第一个分区
consumer.assign(Collections.singleton(topicPartition)); consumer.assign(Collections.singleton(topicPartition));
// 开启线程持续拉取数据 // 开启线程持续拉取数据
while (true) { while (true) {
ConsumerRecords<String, String> records = null; ConsumerRecords<String, String> records = null;
try { try {
// 每个一秒钟拉取一次数据 // 每个一秒钟拉取一次数据
records = consumer.poll(Duration.ofMillis(1000)); records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value()); System.out.println(record.value());
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
log.info("开始消费时间{}:",startTime); log.info("开始消费时间{}:",startTime);
// 解析数据 // 解析数据
String data = ParseUtil.sixteenToStr(record.value()); String data = ParseUtil.sixteenToStr(record.value());
// 构建数据对象 // 构建数据对象
VehicleData vehicleData = VehicleData.getBuild(data); VehicleData vehicleData = VehicleData.getBuild(data);
dataBuffer.addToBuffer(vehicleData);
log.info(vehicleData); log.info(vehicleData);
dataQueueManager.enqueueData(vehicleData);
log.info("耗费时间{}",System.currentTimeMillis()-startTime); log.info("耗费时间{}",System.currentTimeMillis()-startTime);
// 调用执行事件 // 调用执行事件
eventHandlerService.executeEvent(vehicleData); eventHandlerService.executeEvent(vehicleData);

View File

@ -1,54 +0,0 @@
package com.parseSystem.utils;
import com.parseSystem.storage.service.StorageDateService;
import com.parseSystem.vehicle.VehicleData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Component
public class DataBuffer {
@Autowired
private StorageDateService storageDateService;
private BlockingQueue<VehicleData> buffer; // 内存队列
private int threshold; // 队列容量阈值
public DataBuffer() {
buffer = new LinkedBlockingQueue<>();// 创建一个链表阻塞队列作为缓冲区
this.threshold = 100;
}
public void addToBuffer(VehicleData data) {
try {
buffer.put(data); // 将数据放入队列,如果队列已满则会阻塞等待
if (buffer.size() >= threshold) {
// 队列容量达到阈值,执行数据库入库操作
startDatabaseInsertion();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
}
}
public void startDatabaseInsertion() {
Thread dbThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
// 从缓冲区获取一批数据
List<VehicleData> batch = new ArrayList<>();
buffer.drainTo(batch, 100);
// 执行数据库入库操作
storageDateService.saveBatch(batch);
}
});
dbThread.start();
}
public void stopDatabaseInsertion() {
// 停止数据库入库线程
// ...
}
}

View File

@ -0,0 +1,73 @@
package com.parseSystem.utils;
import com.parseSystem.storage.service.StorageDateService;
import com.parseSystem.vehicle.VehicleData;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author
* @version 1.0
* @description:
* @date 2023/12/1 20:47
*/
@Component
@Log4j2
public class DataQueueManager {
@Autowired
private StorageDateService storageDateService;
private static final int BATCH_SIZE_THRESHOLD = 100; // 批量处理阈值
@Autowired
private BlockingQueue<VehicleData> dataQueue1;
@Autowired
private BlockingQueue<VehicleData> dataQueue2;
public void enqueueData(VehicleData vehicleData) {
// 将数据放入第一个队列中,如果已满,则放入第二个队列中
if (!dataQueue1.offer(vehicleData)) {
log.warn("第一个队列已满,将数据放入第二个队列");
dataQueue2.offer(vehicleData);
}
}
@Scheduled(fixedRate = 5000)
public void processQueues() {
// 如果第一个队列中的数据数量达到阈值,就从第一个队列中取出数据进行批量持久化操作
if (dataQueue1.size() >= BATCH_SIZE_THRESHOLD) {
List<VehicleData> batchData = new ArrayList<>();
dataQueue1.drainTo(batchData, BATCH_SIZE_THRESHOLD);
batchPersistToDatabase(batchData);
}
// 如果第一个队列为空,但是第二个队列不为空,就从第二个队列中取出数据进行持久化操作
else if (dataQueue1.isEmpty() && !dataQueue2.isEmpty()) {
log.warn("第一个队列为空,从第二个队列中取出数据进行持久化操作");
List<VehicleData> batchData = new ArrayList<>();
dataQueue2.drainTo(batchData, BATCH_SIZE_THRESHOLD);
batchPersistToDatabase(batchData);
}
}
private void batchPersistToDatabase(List<VehicleData> batchData) {
// 批量持久化到数据库的逻辑
for (VehicleData vehicleData : batchData) {
storageDateService.save(vehicleData); // 持久化到数据库
}
}
}

View File

@ -0,0 +1,27 @@
package com.parseSystem.utils;
import com.parseSystem.vehicle.VehicleData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author
* @version 1.0
* @description:
* @date 2023/12/1 21:15
*/
@Configuration
public class QueueConfig {
@Bean
public BlockingQueue<VehicleData> dataQueue1() {
return new LinkedBlockingQueue<>();
}
@Bean
public BlockingQueue<VehicleData> dataQueue2() {
return new LinkedBlockingQueue<>();
}
}

View File

@ -0,0 +1,14 @@
spring:
profiles:
active: dev
redis:
host: 10.100.1.2
port: 6379
password:
application:
name: parseSystem
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://117.72.43.22:4000/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
server:
port: 8097

View File

@ -1,4 +1,6 @@
spring: spring:
profiles:
active: dev
redis: redis:
host: 10.100.1.2 host: 10.100.1.2
port: 6379 port: 6379