diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java index 2e1bc55..cd7dbe2 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/TestConsumer.java @@ -2,7 +2,6 @@ package com.muyu.event.process.consumer; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; -import com.muyu.common.core.constant.KafkaConstants; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -19,7 +18,7 @@ import java.util.Collection; * @Description 测试消费者 */ @Slf4j -@Component +//@Component @RequiredArgsConstructor public class TestConsumer implements InitializingBean { @@ -28,12 +27,18 @@ public class TestConsumer implements InitializingBean { */ private final KafkaConsumer kafkaConsumer; + /** + * kafka主题名称 + */ + private static final String topicName = "test-topic"; + + @Override public void afterPropertiesSet() throws Exception { new Thread(() -> { - log.info("启动线程监听Topic: {}", KafkaConstants.MESSAGE_PARSING); + log.info("启动线程监听Topic: {}", topicName); ThreadUtil.sleep(1000); - Collection topics = Lists.newArrayList(KafkaConstants.MESSAGE_PARSING); + Collection topics = Lists.newArrayList(topicName); kafkaConsumer.subscribe(topics); while (true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); diff --git a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java index b4e9cd6..b3fe6b7 100644 --- a/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java +++ b/cloud-modules/cloud-modules-event-process/src/main/java/com/muyu/event/process/consumer/VehicleConsumer.java @@ -58,7 +58,7 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener topics = Collections.singletonList(KafkaConstants.MESSAGE_PARSING); kafkaConsumer.subscribe(topics); while (true) { - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(200)); consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord))); } }