增加 缓冲池

master^2
冯凯 2023-12-01 13:50:55 +08:00
parent 0d79359080
commit 1deb1d8cd7
4 changed files with 68 additions and 8 deletions

View File

@ -1,6 +1,8 @@
package com.parseSystem.kafka.ConsumerConfig;
import com.parseSystem.storage.service.StorageDateService;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@ -15,8 +17,10 @@ import java.util.Properties;
@Component
public class KafkaConsumerConfig {
@Bean
public KafkaConsumer<String, String> consumerInit() {
KafkaConsumer<String, String> consumer;
Properties properties = new Properties();
properties.put("bootstrap.servers", "117.72.43.22:9092");

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.parseSystem.event.EventHandlerService;
import com.parseSystem.kafka.constant.kafkaConstants;
import com.parseSystem.storage.service.StorageDateService;
import com.parseSystem.utils.DataBuffer;
import com.parseSystem.utils.ParseUtil;
import com.parseSystem.vehicle.VehicleData;
import com.rabbitmq.client.Channel;
@ -33,6 +34,8 @@ import java.util.Collections;
@Component
@Log4j2
public class ListenEventChangeRabbitMq {
@Autowired
private DataBuffer dataBuffer;
@Autowired
private KafkaConsumer<String, String> consumer;
@Autowired
@ -50,7 +53,7 @@ public class ListenEventChangeRabbitMq {
*/
@RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")})
public void consumerSubscribe(String mesg, Message message, Channel channel) {
log.info("收到准备订阅主题:" + mesg);
log.info("收到准备订阅主题:{}" , mesg);
// 将接收到的消息解析为kafkaConstants对象
kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class);
@ -74,17 +77,16 @@ public class ListenEventChangeRabbitMq {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
long startTime = System.currentTimeMillis();
log.info("开始消费时间{}:",startTime);
// 解析数据
String data = ParseUtil.sixteenToStr(record.value());
// 构建数据对象
VehicleData vehicleData = VehicleData.getBuild(data);
// 存储数据到tiDB
storageDateService.save(vehicleData);
System.out.println(vehicleData);
dataBuffer.addToBuffer(vehicleData);
log.info(vehicleData);
log.info("耗费时间{}",System.currentTimeMillis()-startTime);
// 调用执行事件
eventHandlerService.executeEvent(vehicleData);
}

View File

@ -0,0 +1,54 @@
package com.parseSystem.utils;
import com.parseSystem.storage.service.StorageDateService;
import com.parseSystem.vehicle.VehicleData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Component
public class DataBuffer {
@Autowired
private StorageDateService storageDateService;
private BlockingQueue<VehicleData> buffer; // 内存队列
private int threshold; // 队列容量阈值
public DataBuffer() {
buffer = new LinkedBlockingQueue<>();// 创建一个链表阻塞队列作为缓冲区
this.threshold = 100;
}
public void addToBuffer(VehicleData data) {
try {
buffer.put(data); // 将数据放入队列,如果队列已满则会阻塞等待
if (buffer.size() >= threshold) {
// 队列容量达到阈值,执行数据库入库操作
startDatabaseInsertion();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
}
}
public void startDatabaseInsertion() {
Thread dbThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
// 从缓冲区获取一批数据
List<VehicleData> batch = new ArrayList<>();
buffer.drainTo(batch, 100);
// 执行数据库入库操作
storageDateService.saveBatch(batch);
}
});
dbThread.start();
}
public void stopDatabaseInsertion() {
// 停止数据库入库线程
// ...
}
}

View File

@ -11,7 +11,7 @@ spring:
username: root
password: 123456
rabbitmq:
host: 47.120.38.248
host: 182.254.222.21
port: 5672
template:
mandatory: true