From 9fe0c50af23a92252b4b56540083457d086f14cc Mon Sep 17 00:00:00 2001 From: gukaixuan <1> Date: Fri, 15 Sep 2023 20:59:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ruoyi/receive/kafka/config/Consumer.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java diff --git a/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java new file mode 100644 index 0000000..5de0f3d --- /dev/null +++ b/ruoyi-receive-server/src/main/java/com/ruoyi/receive/kafka/config/Consumer.java @@ -0,0 +1,53 @@ +package com.ruoyi.receive.kafka.config; + +import com.sun.corba.se.internal.CosNaming.BootstrapServer; +import lombok.AllArgsConstructor; +import net.sf.jsqlparser.statement.select.Top; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; + +/** + * 消费者 + */ + +@Component +public class Consumer { + + private final KafkaConsumerConfig kafkaConsumerConfig; + + private final ConsumerCustomConfig consumerCustomConfig; + + public Consumer(KafkaConsumerConfig kafkaConsumerConfig, ConsumerCustomConfig consumerCustomConfig) { + this.kafkaConsumerConfig = kafkaConsumerConfig; + this.consumerCustomConfig = consumerCustomConfig; + } + + + + + @Bean + public KafkaConsumer consumerInit(){ + //创建kafka消费者配置 + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaConsumerConfig.consumerConfigs() ); + properties.put(ConsumerConfig.GROUP_ID_CONFIG,consumerCustomConfig.getGroup()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); + //创建kafka消费者 + KafkaConsumer consumer= new KafkaConsumer<>(properties); + //订阅主题 + consumer.subscribe(Collections.singletonList(consumerCustomConfig.getTopic())); + return consumer; + } + + + +}