diff --git a/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java b/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java index f6c58f0..b595580 100644 --- a/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java +++ b/src/main/java/com/parseSystem/kafka/ConsumerConfig/KafkaConsumerConfig.java @@ -1,6 +1,8 @@ package com.parseSystem.kafka.ConsumerConfig; +import com.parseSystem.storage.service.StorageDateService; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @@ -15,8 +17,10 @@ import java.util.Properties; @Component public class KafkaConsumerConfig { + @Bean public KafkaConsumer consumerInit() { + KafkaConsumer consumer; Properties properties = new Properties(); properties.put("bootstrap.servers", "117.72.43.22:9092"); diff --git a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java index 848e0ad..406646b 100644 --- a/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java +++ b/src/main/java/com/parseSystem/rabbitmq/ListenEventChangeRabbitMq.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.parseSystem.event.EventHandlerService; import com.parseSystem.kafka.constant.kafkaConstants; import com.parseSystem.storage.service.StorageDateService; +import com.parseSystem.utils.DataBuffer; import com.parseSystem.utils.ParseUtil; import com.parseSystem.vehicle.VehicleData; import com.rabbitmq.client.Channel; @@ -33,6 +34,8 @@ import java.util.Collections; @Component @Log4j2 public class ListenEventChangeRabbitMq { + @Autowired + private DataBuffer dataBuffer; @Autowired private KafkaConsumer consumer; @Autowired @@ -50,7 +53,7 @@ public class ListenEventChangeRabbitMq { */ @RabbitListener(queuesToDeclare = {@Queue(value = "kafka_top")}) public void consumerSubscribe(String mesg, Message message, Channel channel) { - log.info("收到准备订阅主题:" + mesg); + log.info("收到准备订阅主题:{}" , mesg); // 将接收到的消息解析为kafkaConstants对象 kafkaConstants kafkaConstants = JSONObject.parseObject(mesg, kafkaConstants.class); @@ -74,17 +77,16 @@ public class ListenEventChangeRabbitMq { for (ConsumerRecord record : records) { System.out.println(record.value()); - + long startTime = System.currentTimeMillis(); + log.info("开始消费时间{}:",startTime); // 解析数据 String data = ParseUtil.sixteenToStr(record.value()); // 构建数据对象 VehicleData vehicleData = VehicleData.getBuild(data); - - // 存储数据到tiDB - storageDateService.save(vehicleData); - System.out.println(vehicleData); - + dataBuffer.addToBuffer(vehicleData); + log.info(vehicleData); + log.info("耗费时间{}",System.currentTimeMillis()-startTime); // 调用执行事件 eventHandlerService.executeEvent(vehicleData); } diff --git a/src/main/java/com/parseSystem/utils/DataBuffer.java b/src/main/java/com/parseSystem/utils/DataBuffer.java new file mode 100644 index 0000000..addc86e --- /dev/null +++ b/src/main/java/com/parseSystem/utils/DataBuffer.java @@ -0,0 +1,54 @@ +package com.parseSystem.utils; + +import com.parseSystem.storage.service.StorageDateService; +import com.parseSystem.vehicle.VehicleData; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +@Component +public class DataBuffer { + @Autowired + private StorageDateService storageDateService; + private BlockingQueue buffer; // 内存队列 + private int threshold; // 队列容量阈值 + public DataBuffer() { + buffer = new LinkedBlockingQueue<>();// 创建一个链表阻塞队列作为缓冲区 + this.threshold = 100; + } + + public void addToBuffer(VehicleData data) { + try { + buffer.put(data); // 将数据放入队列,如果队列已满则会阻塞等待 + if (buffer.size() >= threshold) { + // 队列容量达到阈值,执行数据库入库操作 + startDatabaseInsertion(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // 处理中断异常 + } + } + + public void startDatabaseInsertion() { + Thread dbThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + // 从缓冲区获取一批数据 + List batch = new ArrayList<>(); + buffer.drainTo(batch, 100); + // 执行数据库入库操作 + storageDateService.saveBatch(batch); + } + }); + dbThread.start(); + } + + public void stopDatabaseInsertion() { + // 停止数据库入库线程 + // ... + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 84ff9fe..1998732 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -11,7 +11,7 @@ spring: username: root password: 123456 rabbitmq: - host: 47.120.38.248 + host: 182.254.222.21 port: 5672 template: mandatory: true