From 67579c2b4f98cbdc4ba60838ac183ec637ebf449 Mon Sep 17 00:00:00 2001 From: KillBinBin <13015807+killbinbin@user.noreply.gitee.com> Date: Fri, 15 Sep 2023 20:21:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=87=8D=E6=9E=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 6 ++ ...rConfig.java => ConsumerCustomConfig.java} | 2 +- .../analysis/kafka/consumer/Consumer.java | 88 +++++++++++++++++++ .../kafka/service/InKafkaConsumer.java | 11 +++ .../service/impl/InKafkaConsumerimpl.java | 16 ++++ src/main/resources/bootstrap.yml | 2 +- 6 files changed, 123 insertions(+), 2 deletions(-) rename src/main/java/com/ruoyi/analysis/kafka/config/{ConsumerConfig.java => ConsumerCustomConfig.java} (93%) create mode 100644 src/main/java/com/ruoyi/analysis/kafka/consumer/Consumer.java create mode 100644 src/main/java/com/ruoyi/analysis/kafka/service/InKafkaConsumer.java create mode 100644 src/main/java/com/ruoyi/analysis/kafka/service/impl/InKafkaConsumerimpl.java diff --git a/pom.xml b/pom.xml index f2a709c..29436f7 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,12 @@ org.apache.hbase hbase-client 2.4.7 + + + slf4j-reload4j + org.slf4j + + diff --git a/src/main/java/com/ruoyi/analysis/kafka/config/ConsumerConfig.java b/src/main/java/com/ruoyi/analysis/kafka/config/ConsumerCustomConfig.java similarity index 93% rename from src/main/java/com/ruoyi/analysis/kafka/config/ConsumerConfig.java rename to src/main/java/com/ruoyi/analysis/kafka/config/ConsumerCustomConfig.java index c23a09e..bb69c1f 100644 --- a/src/main/java/com/ruoyi/analysis/kafka/config/ConsumerConfig.java +++ b/src/main/java/com/ruoyi/analysis/kafka/config/ConsumerCustomConfig.java @@ -10,7 +10,7 @@ import org.springframework.context.annotation.Configuration; @Data @Configuration @ConfigurationProperties(prefix = "kafka.consumer") -public class ConsumerConfig { +public class ConsumerCustomConfig { /** *消费者 */ diff --git a/src/main/java/com/ruoyi/analysis/kafka/consumer/Consumer.java b/src/main/java/com/ruoyi/analysis/kafka/consumer/Consumer.java new file mode 100644 index 0000000..ec082c7 --- /dev/null +++ b/src/main/java/com/ruoyi/analysis/kafka/consumer/Consumer.java @@ -0,0 +1,88 @@ +package com.ruoyi.analysis.kafka.consumer; + +import com.ruoyi.analysis.kafka.config.ConsumerCustomConfig; +import com.ruoyi.analysis.kafka.config.KafkaConfig; +import com.ruoyi.analysis.kafka.service.InKafkaConsumer; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.codehaus.jackson.map.deser.std.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +/** + * 消费者 + */ +@Component +@Log4j2 +public class Consumer{ + + /** + * 消费者配置 + */ + private final ConsumerCustomConfig consumerCustomConfig; + /** + * kafka配置 + */ + private final KafkaConfig kafkaConfig; + /** + * 接口 + */ + private InKafkaConsumer inKafkaConsumer; + + public Consumer(ConsumerCustomConfig consumerCustomConfig, KafkaConfig kafkaConfig, InKafkaConsumer inKafkaConsumer) { + this.consumerCustomConfig = consumerCustomConfig; + this.kafkaConfig = kafkaConfig; + this.inKafkaConsumer = inKafkaConsumer; + } + + //@PostConstruct 是一个注解,它被用于标记一个方法,在对象被创建后立即执行该方法 + //@PostConstruct + @Bean + public KafkaConsumer init(){ + //消费者配置 + Properties properties = new Properties(); + + //服务地址 + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaConfig.getHosts()); + //消费者 + properties.put(ConsumerConfig.GROUP_ID_CONFIG,consumerCustomConfig.getGroup()); + //反序列化key + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + //反序列化value + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); + + + //创建kafka消费者 + KafkaConsumer consumer = new KafkaConsumer<>(properties); + //订阅主题 + consumer.subscribe(Collections.singleton(consumerCustomConfig.getTopic())); + //分配分区 + TopicPartition topicPartition = new TopicPartition(consumerCustomConfig.topic, consumerCustomConfig.getPartitions()); + consumer.assign(Collections.singletonList(topicPartition)); + + //把循环放在另外一个线程执行 否则不会返回数据 + new Thread(()->{ + //消费信息 + while (true){ + //拉取主题中的消息 + ConsumerRecords records = consumer.poll(Duration.ofMillis(consumerCustomConfig.getWaitingTime())); + //接口传入参数 + inKafkaConsumer.kafkaConsumerInterface(records); + for (ConsumerRecord record : records) { + log.info("接收到的消息 key:"+record.key()+"value:"+record.value()); + } + } + }).start(); + + return consumer; + } + +} diff --git a/src/main/java/com/ruoyi/analysis/kafka/service/InKafkaConsumer.java b/src/main/java/com/ruoyi/analysis/kafka/service/InKafkaConsumer.java new file mode 100644 index 0000000..01833d8 --- /dev/null +++ b/src/main/java/com/ruoyi/analysis/kafka/service/InKafkaConsumer.java @@ -0,0 +1,11 @@ +package com.ruoyi.analysis.kafka.service; + +import org.apache.kafka.clients.consumer.ConsumerRecords; + +/** + * kafka消费者接口 + */ +public interface InKafkaConsumer { + + public void kafkaConsumerInterface(ConsumerRecords records); +} diff --git a/src/main/java/com/ruoyi/analysis/kafka/service/impl/InKafkaConsumerimpl.java b/src/main/java/com/ruoyi/analysis/kafka/service/impl/InKafkaConsumerimpl.java new file mode 100644 index 0000000..8deeac5 --- /dev/null +++ b/src/main/java/com/ruoyi/analysis/kafka/service/impl/InKafkaConsumerimpl.java @@ -0,0 +1,16 @@ +package com.ruoyi.analysis.kafka.service.impl; + +import com.ruoyi.analysis.kafka.service.InKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.stereotype.Service; + +/** + * 接口实现 + */ +@Service +public class InKafkaConsumerimpl implements InKafkaConsumer { + @Override + public void kafkaConsumerInterface(ConsumerRecords records) { + + } +} diff --git a/src/main/resources/bootstrap.yml b/src/main/resources/bootstrap.yml index 14513d2..d30eba6 100644 --- a/src/main/resources/bootstrap.yml +++ b/src/main/resources/bootstrap.yml @@ -1,6 +1,6 @@ # Tomcat server: - port: 9456 + port: 9466 netty: port: 8081 boss-group-thread-number: 5