From 38a096ea7254eb824e73118158267fde8bf4f3c7 Mon Sep 17 00:00:00 2001 From: xinzirun Date: Sat, 28 Sep 2024 21:23:24 +0800 Subject: [PATCH] =?UTF-8?q?feat():=20=E6=96=B0=E5=A2=9Ekafka=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-common/cloud-common-kafka/pom.xml | 33 ++++++ .../kafka/config/KafkaConsumerConfig.java | 102 ++++++++++++++++++ .../kafka/config/KafkaProducerConfig.java | 74 +++++++++++++ .../kafka/constant/KafkaConfigConstants.java | 74 +++++++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 2 + cloud-common/pom.xml | 1 + pom.xml | 18 +++- 7 files changed, 303 insertions(+), 1 deletion(-) create mode 100644 cloud-common/cloud-common-kafka/pom.xml create mode 100644 cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java create mode 100644 cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java create mode 100644 cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constant/KafkaConfigConstants.java create mode 100644 cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml new file mode 100644 index 0000000..924820f --- /dev/null +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + com.muyu + cloud-common + 3.6.3 + + + cloud-common-kafka + + + 17 + 17 + UTF-8 + + + + + + com.muyu + cloud-common-core + + + + + org.apache.kafka + kafka-clients + + + \ No newline at end of file diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..772ed68 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,102 @@ +package com.muyu.common.kafka.config; + +import com.muyu.common.core.text.StrFormatter; +import com.muyu.common.kafka.constant.KafkaConfigConstants; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import java.util.HashMap; + +/** + * @Author: zi run + * @Date 2024/9/28 20:32 + * @Description Kafka消费者配置 + */ +@Configuration +public class KafkaConsumerConfig { + + /** + * 服务端IP + */ + @Value("${kafka.consumer.bootstrap-servers-ip}") + private String bootstrapServersIP; + + /** + * 服务端口号 + */ + @Value("${kafka.consumer.bootstrap-servers-port}") + private String bootstrapServersPort; + + /** + * 开启消费者偏移量 + */ + @Value("${kafka.consumer.enable-auto-commit}") + private Boolean enableAutoCommit; + + /** + * 自动提交时间间隔 + */ + @Value("${kafka.consumer.auto-commit-interval}") + private Integer autoCommitInterval; + + /** + * 自动重置偏移量 + */ + @Value("${kafka.consumer.auto-offset-reset}") + private String autoOffsetReset; + + /** + * 请求阻塞的最大时间 + */ + @Value("${kafka.consumer.fetch-max-wait}") + private Integer fetchMaxWait; + + /** + * 请求应答的最小字节数 + */ + @Value("${kafka.consumer.fetch-min-size}") + private Integer fetchMinSize; + + /** + * 心跳间隔时间 + */ + @Value("${kafka.consumer.heartbeat-interval}") + private Integer heartbeatInterval; + + /** + * 一次调用poll返回的最大记录条数 + */ + @Value("${kafka.consumer.max-poll-records}") + private Integer maxPollRecords; + + /** + * 指定消费组 + */ + @Value("${kafka.consumer.group-id}") + private String groupId; + + /** + * Kafka消费者初始化配置 + * @return Kafka消费者实例 + */ + @Bean + public KafkaConsumer kafkaConsumer() { + HashMap configs = new HashMap<>(); + configs.put(KafkaConfigConstants.BOOTSTRAP_SERVERS, + StrFormatter.format("{}:{}", bootstrapServersIP, bootstrapServersPort)); + configs.put(KafkaConfigConstants.ENABLE_AUTO_COMMIT, enableAutoCommit); + configs.put(KafkaConfigConstants.AUTO_COMMIT_INTERVAL, autoCommitInterval); + configs.put(KafkaConfigConstants.AUTO_OFFSET_RESET, autoOffsetReset); + configs.put(KafkaConfigConstants.FETCH_MAX_WAIT, fetchMaxWait); + configs.put(KafkaConfigConstants.FETCH_MIN_SIZE, fetchMinSize); + configs.put(KafkaConfigConstants.HEARTBEAT_INTERVAL, heartbeatInterval); + configs.put(KafkaConfigConstants.MAX_POLL_RECORDS, maxPollRecords); + configs.put(KafkaConfigConstants.GROUP_ID, groupId); + Deserializer keyDeserializer = new StringDeserializer(); + Deserializer valueDeserializer = new StringDeserializer(); + return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer); + } +} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..876b8b4 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,74 @@ +package com.muyu.common.kafka.config; + +import com.muyu.common.core.text.StrFormatter; +import com.muyu.common.kafka.constant.KafkaConfigConstants; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import java.util.HashMap; + +/** + * @Author: zi run + * @Date 2024/9/28 16:35 + * @Description Kafka生产者配置 + */ +@Configuration +public class KafkaProducerConfig { + + /** + * 服务端IP + */ + @Value("${kafka.producer.bootstrap-servers-ip}") + private String bootstrapServersIP; + + /** + * 服务端口号 + */ + @Value("${kafka.producer.bootstrap-servers-port}") + private String bootstrapServersPort; + + /** + * 重试次数 + */ + @Value("${kafka.producer.retries}") + private Integer retries; + + /** + * 默认批量大小 + */ + @Value("${kafka.producer.batch-size}") + private Integer batchSize; + + /** + * 总内存字节数 + */ + @Value("${kafka.producer.buffer-memory}") + private Integer bufferMemory; + + /** + * 偏移量 + */ + @Value("${kafka.producer.acks}") + private String acks; + + /** + * Kafka生产者初始化配置 + * @return kafka生产者实例 + */ + @Bean + public KafkaProducer kafkaProducer() { + HashMap configs = new HashMap<>(); + configs.put(KafkaConfigConstants.BOOTSTRAP_SERVERS, + StrFormatter.format("{}:{}", bootstrapServersIP, bootstrapServersPort)); + configs.put(KafkaConfigConstants.RETRIES, retries); + configs.put(KafkaConfigConstants.BATCH_SIZE, batchSize); + configs.put(KafkaConfigConstants.BUFFER_MEMORY, bufferMemory); + configs.put(KafkaConfigConstants.ACKS, acks); + Serializer keySerializer = new StringSerializer(); + Serializer valueSerializer = new StringSerializer(); + return new KafkaProducer<>(configs, keySerializer, valueSerializer); + } +} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constant/KafkaConfigConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constant/KafkaConfigConstants.java new file mode 100644 index 0000000..b95eef4 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constant/KafkaConfigConstants.java @@ -0,0 +1,74 @@ +package com.muyu.common.kafka.constant; + +/** + * @Author: zi run + * @Date 2024/9/28 20:07 + * @Description Kafka配置通用常量 + */ +public class KafkaConfigConstants { + + /** + * 服务端ip+端口号 + */ + public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + + /** + * 重试次数 + */ + public static final String RETRIES = "retries"; + + /** + * 默认批量大小 + */ + public static final String BATCH_SIZE = "batch.size"; + + /** + * 总内存字节数 + */ + public static final String BUFFER_MEMORY = "buffer-memory"; + + /** + * 偏移量 + */ + public static final String ACKS = "acks"; + + /** + * 开启消费者偏移量 + */ + public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + /** + * 自动提交时间间隔 + */ + public static final String AUTO_COMMIT_INTERVAL = "auto.commit.interval"; + + /** + * 自动重置偏移量 + */ + public static final String AUTO_OFFSET_RESET = "auto.offset.reset"; + + /** + * 请求阻塞的最大时间 + */ + public static final String FETCH_MAX_WAIT = "fetch.max.wait"; + + /** + * 请求应答的最小字节数 + */ + public static final String FETCH_MIN_SIZE = "fetch.min.size"; + + /** + * 心跳间隔时间 + */ + public static final String HEARTBEAT_INTERVAL = "heartbeat-interval"; + + /** + * 一次调用poll返回的最大记录条数 + */ + public static final String MAX_POLL_RECORDS = "max.poll.records"; + + /** + * 指定消费组 + */ + public static final String GROUP_ID = "group.id"; +} diff --git a/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..7e2c989 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,2 @@ +com.muyu.common.kafka.config.KafkaProducerConfig +com.muyu.common.kafka.config.KafkaConsumerConfig \ No newline at end of file diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml index d052b4d..157bcd1 100644 --- a/cloud-common/pom.xml +++ b/cloud-common/pom.xml @@ -21,6 +21,7 @@ cloud-common-system cloud-common-xxl cloud-common-rabbit + cloud-common-kafka cloud-common diff --git a/pom.xml b/pom.xml index d807259..a49e303 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ 4.1.0 2.4.1 2.2.8 + 3.0.0 @@ -184,12 +185,20 @@ ${transmittable-thread-local.version} + io.swagger.core.v3 swagger-annotations-jakarta ${swagger.an.jakarta.verison} + + + org.apache.kafka + kafka-clients + ${kafka.clients.verison} + + com.muyu @@ -281,7 +290,14 @@ ${muyu.version} - + + + com.muyu + cloud-common-kafka + ${muyu.version} + + + com.muyu cloud-modules-enterprise-common