From ff3f1e8ac2b76c11f13eb6bc644282f650af059b Mon Sep 17 00:00:00 2001 From: xinzirun Date: Mon, 30 Sep 2024 20:07:32 +0800 Subject: [PATCH] =?UTF-8?q?fix():=20=E8=A7=A3=E5=86=B3=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=86=B2=E7=AA=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../muyu/event/process/consumer/TestConsumer.java | 13 +++++++++---- .../event/process/consumer/VehicleConsumer.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java index 2e1bc55..cd7dbe2 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java @@ -2,7 +2,6 @@ package com.muyu.event.process.consumer; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; -import com.muyu.common.core.constant.KafkaConstants; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -19,7 +18,7 @@ import java.util.Collection; * @Description 测试消费者 */ @Slf4j -@Component +//@Component @RequiredArgsConstructor public class TestConsumer implements InitializingBean { @@ -28,12 +27,18 @@ public class TestConsumer implements InitializingBean { */ private final KafkaConsumer kafkaConsumer; + /** + * kafka主题名称 + */ + private static final String topicName = "test-topic"; + + @Override public void afterPropertiesSet() throws Exception { new Thread(() -> { - log.info("启动线程监听Topic: {}", KafkaConstants.MESSAGE_PARSING); + log.info("启动线程监听Topic: {}", topicName); ThreadUtil.sleep(1000); - Collection topics = Lists.newArrayList(KafkaConstants.MESSAGE_PARSING); + Collection topics = Lists.newArrayList(topicName); kafkaConsumer.subscribe(topics); while (true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java index b4e9cd6..b3fe6b7 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java @@ -58,7 +58,7 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener topics = Collections.singletonList(KafkaConstants.MESSAGE_PARSING); kafkaConsumer.subscribe(topics); while (true) { - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(200)); consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord))); } }