From 4afd656bc574cee5ec0224b9597324b1d121b5a9 Mon Sep 17 00:00:00 2001 From: gukaixuan <1> Date: Sat, 16 Sep 2023 14:52:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BA=8C=E6=AC=A1=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ruoyi/receive/kafka/config/Consumer.java | 34 +++++++++++++------ .../kafka/config/ConsumerCustomConfig.java | 6 +++- .../netty/server/KafkaConsumerService.java | 12 +++++++ .../server/impl/KafkaConsumerServiceImpl.java | 19 +++++++++++ 4 files changed, 60 insertions(+), 11 deletions(-) create mode 100644 ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/KafkaConsumerService.java create mode 100644 ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/impl/KafkaConsumerServiceImpl.java diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java index 5de0f3d..b373c85 100644 --- a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java @@ -1,16 +1,16 @@ package com.ruoyi.receive.kafka.config; -import com.sun.corba.se.internal.CosNaming.BootstrapServer; -import lombok.AllArgsConstructor; -import net.sf.jsqlparser.statement.select.Top; +import com.ruoyi.receive.netty.server.KafkaConsumerService; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import java.util.Collection; +import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -25,14 +25,15 @@ public class Consumer { private final ConsumerCustomConfig consumerCustomConfig; - public Consumer(KafkaConsumerConfig kafkaConsumerConfig, ConsumerCustomConfig consumerCustomConfig) { + private final KafkaConsumerService kafkaConsumerService; + + public Consumer(KafkaConsumerConfig kafkaConsumerConfig, ConsumerCustomConfig consumerCustomConfig, KafkaConsumerService kafkaConsumerService) { this.kafkaConsumerConfig = kafkaConsumerConfig; this.consumerCustomConfig = consumerCustomConfig; + this.kafkaConsumerService = kafkaConsumerService; } - - @Bean public KafkaConsumer consumerInit(){ //创建kafka消费者配置 @@ -43,8 +44,21 @@ public class Consumer { properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //创建kafka消费者 KafkaConsumer consumer= new KafkaConsumer<>(properties); - //订阅主题 - consumer.subscribe(Collections.singletonList(consumerCustomConfig.getTopic())); + //分配特定的分区 + TopicPartition topicPartition = new TopicPartition(consumerCustomConfig.getTopic(), consumerCustomConfig.getPartitions()); + consumer.assign(Collections.singletonList(topicPartition)); + new Thread(()->{ + + //消费信息 + while (true){ + + ConsumerRecords records = + consumer.poll(Duration.ofMillis(consumerCustomConfig.getOfMillis())); + kafkaConsumerService.service(records); + + } + + }).start(); return consumer; } diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/ConsumerCustomConfig.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/ConsumerCustomConfig.java index 6869b65..d26362c 100644 --- a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/ConsumerCustomConfig.java +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/ConsumerCustomConfig.java @@ -18,8 +18,12 @@ public class ConsumerCustomConfig { private String topic; - private String partitions; + private Integer partitions; + /** + * 等待时间 + */ + private long ofMillis = 1000; } diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/KafkaConsumerService.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/KafkaConsumerService.java new file mode 100644 index 0000000..0cc2ad3 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/KafkaConsumerService.java @@ -0,0 +1,12 @@ +package com.ruoyi.receive.netty.server; + +import org.apache.kafka.clients.consumer.ConsumerRecords; + +/** + * kafka消费者接口 + */ +public interface KafkaConsumerService { + + public void service(ConsumerRecords records); + +} diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/impl/KafkaConsumerServiceImpl.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/impl/KafkaConsumerServiceImpl.java new file mode 100644 index 0000000..b1ac634 --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/netty/server/impl/KafkaConsumerServiceImpl.java @@ -0,0 +1,19 @@ +package com.ruoyi.receive.netty.server.impl; + +import com.ruoyi.receive.netty.server.KafkaConsumerService; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.stereotype.Service; + +@Service +public class KafkaConsumerServiceImpl implements KafkaConsumerService { + + @Override + public void service(ConsumerRecords records) { + + for (ConsumerRecord record : records){ + System.out.println("消息:key = " + record.key() + ",value = " + record.value()); + } + + } +}