fix(): 解决代码冲突问题

dev.vehicleGateway
xinzirun 2024-09-30 20:07:32 +08:00
parent 147dbe2c99
commit ff3f1e8ac2
2 changed files with 10 additions and 5 deletions

View File

@ -2,7 +2,6 @@ package com.muyu.event.process.consumer;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.common.core.constant.KafkaConstants;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -19,7 +18,7 @@ import java.util.Collection;
* @Description
*/
@Slf4j
@Component
//@Component
@RequiredArgsConstructor
public class TestConsumer implements InitializingBean {
@ -28,12 +27,18 @@ public class TestConsumer implements InitializingBean {
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
* kafka
*/
private static final String topicName = "test-topic";
@Override
public void afterPropertiesSet() throws Exception {
new Thread(() -> {
log.info("启动线程监听Topic: {}", KafkaConstants.MESSAGE_PARSING);
log.info("启动线程监听Topic: {}", topicName);
ThreadUtil.sleep(1000);
Collection<String> topics = Lists.newArrayList(KafkaConstants.MESSAGE_PARSING);
Collection<String> topics = Lists.newArrayList(topicName);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));

View File

@ -58,7 +58,7 @@ public class VehicleConsumer implements ApplicationRunner, ApplicationListener<C
List<String> topics = Collections.singletonList(KafkaConstants.MESSAGE_PARSING);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(200));
consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord)));
}
}