diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml new file mode 100644 index 0000000..1d7e765 --- /dev/null +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + com.muyu + cloud-common + 3.6.3 + + + cloud-common-kafka + + + 8 + 8 + UTF-8 + + + + + + com.muyu + cloud-common-core + + + + + org.apache.kafka + kafka-clients + + + + diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..2a75122 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,102 @@ +package com.muyu.common.kafka.config; + +import com.muyu.common.core.text.StrFormatter; +import com.muyu.common.kafka.constant.KafkaConfigConstants; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import java.util.HashMap; + +/** + * @Author: wangXin + * @Date 2024/9/28 20:32 + * @Description Kafka消费者配置 + */ +@Configuration +public class KafkaConsumerConfig { + + /** + * 服务端IP + */ + @Value("${kafka.consumer.bootstrap-servers-ip}") + private String bootstrapServersIP; + + /** + * 服务端口号 + */ + @Value("${kafka.consumer.bootstrap-servers-port}") + private String bootstrapServersPort; + + /** + * 开启消费者偏移量 + */ + @Value("${kafka.consumer.enable-auto-commit}") + private Boolean enableAutoCommit; + + /** + * 自动提交时间间隔 + */ + @Value("${kafka.consumer.auto-commit-interval}") + private Integer autoCommitInterval; + + /** + * 自动重置偏移量 + */ + @Value("${kafka.consumer.auto-offset-reset}") + private String autoOffsetReset; + + /** + * 请求阻塞的最大时间 + */ + @Value("${kafka.consumer.fetch-max-wait}") + private Integer fetchMaxWait; + + /** + * 请求应答的最小字节数 + */ + @Value("${kafka.consumer.fetch-min-size}") + private Integer fetchMinSize; + + /** + * 心跳间隔时间 + */ + @Value("${kafka.consumer.heartbeat-interval}") + private Integer heartbeatInterval; + + /** + * 一次调用poll返回的最大记录条数 + */ + @Value("${kafka.consumer.max-poll-records}") + private Integer maxPollRecords; + + /** + * 指定消费组 + */ + @Value("${kafka.consumer.group-id}") + private String groupId; + + /** + * Kafka消费者初始化配置 + * @return Kafka消费者实例 + */ + @Bean + public KafkaConsumer kafkaConsumer() { + HashMap configs = new HashMap<>(); + configs.put(KafkaConfigConstants.BOOTSTRAP_SERVERS, + StrFormatter.format("{}:{}", bootstrapServersIP, bootstrapServersPort)); + configs.put(KafkaConfigConstants.ENABLE_AUTO_COMMIT, enableAutoCommit); + configs.put(KafkaConfigConstants.AUTO_COMMIT_INTERVAL, autoCommitInterval); + configs.put(KafkaConfigConstants.AUTO_OFFSET_RESET, autoOffsetReset); + configs.put(KafkaConfigConstants.FETCH_MAX_WAIT, fetchMaxWait); + configs.put(KafkaConfigConstants.FETCH_MIN_SIZE, fetchMinSize); + configs.put(KafkaConfigConstants.HEARTBEAT_INTERVAL, heartbeatInterval); + configs.put(KafkaConfigConstants.MAX_POLL_RECORDS, maxPollRecords); + configs.put(KafkaConfigConstants.GROUP_ID, groupId); + Deserializer keyDeserializer = new StringDeserializer(); + Deserializer valueDeserializer = new StringDeserializer(); + return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer); + } +} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..51162de --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,74 @@ +package com.muyu.common.kafka.config; + +import com.muyu.common.core.text.StrFormatter; +import com.muyu.common.kafka.constant.KafkaConfigConstants; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import java.util.HashMap; + +/** + * @Author: wangXin + * @Date 2024/9/28 16:35 + * @Description Kafka生产者配置 + */ +@Configuration +public class KafkaProducerConfig { + + /** + * 服务端IP + */ + @Value("${kafka.producer.bootstrap-servers-ip}") + private String bootstrapServersIP; + + /** + * 服务端口号 + */ + @Value("${kafka.producer.bootstrap-servers-port}") + private String bootstrapServersPort; + + /** + * 重试次数 + */ + @Value("${kafka.producer.retries}") + private Integer retries; + + /** + * 默认批量大小 + */ + @Value("${kafka.producer.batch-size}") + private Integer batchSize; + + /** + * 总内存字节数 + */ + @Value("${kafka.producer.buffer-memory}") + private Integer bufferMemory; + + /** + * 偏移量 + */ + @Value("${kafka.producer.acks}") + private String acks; + + /** + * Kafka生产者初始化配置 + * @return kafka生产者实例 + */ + @Bean + public KafkaProducer kafkaProducer() { + HashMap configs = new HashMap<>(); + configs.put(KafkaConfigConstants.BOOTSTRAP_SERVERS, + StrFormatter.format("{}:{}", bootstrapServersIP, bootstrapServersPort)); + configs.put(KafkaConfigConstants.RETRIES, retries); + configs.put(KafkaConfigConstants.BATCH_SIZE, batchSize); + configs.put(KafkaConfigConstants.BUFFER_MEMORY, bufferMemory); + configs.put(KafkaConfigConstants.ACKS, acks); + Serializer keySerializer = new StringSerializer(); + Serializer valueSerializer = new StringSerializer(); + return new KafkaProducer<>(configs, keySerializer, valueSerializer); + } +} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constant/KafkaConfigConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constant/KafkaConfigConstants.java new file mode 100644 index 0000000..0bd1f73 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constant/KafkaConfigConstants.java @@ -0,0 +1,74 @@ +package com.muyu.common.kafka.constant; + +/** + * @Author: wangXin + * @Date 2024/9/28 20:07 + * @Description Kafka配置通用常量 + */ +public class KafkaConfigConstants { + + /** + * 服务端ip+端口号 + */ + public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + + /** + * 重试次数 + */ + public static final String RETRIES = "retries"; + + /** + * 默认批量大小 + */ + public static final String BATCH_SIZE = "batch.size"; + + /** + * 总内存字节数 + */ + public static final String BUFFER_MEMORY = "buffer-memory"; + + /** + * 偏移量 + */ + public static final String ACKS = "acks"; + + /** + * 开启消费者偏移量 + */ + public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + /** + * 自动提交时间间隔 + */ + public static final String AUTO_COMMIT_INTERVAL = "auto.commit.interval"; + + /** + * 自动重置偏移量 + */ + public static final String AUTO_OFFSET_RESET = "auto.offset.reset"; + + /** + * 请求阻塞的最大时间 + */ + public static final String FETCH_MAX_WAIT = "fetch.max.wait"; + + /** + * 请求应答的最小字节数 + */ + public static final String FETCH_MIN_SIZE = "fetch.min.size"; + + /** + * 心跳间隔时间 + */ + public static final String HEARTBEAT_INTERVAL = "heartbeat-interval"; + + /** + * 一次调用poll返回的最大记录条数 + */ + public static final String MAX_POLL_RECORDS = "max.poll.records"; + + /** + * 指定消费组 + */ + public static final String GROUP_ID = "group.id"; +} diff --git a/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..1d39066 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,2 @@ +com.muyu.common.kafka.config.KafkaProducerConfig +com.muyu.common.kafka.config.KafkaConsumerConfig diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml index d00dfc6..f5bb268 100644 --- a/cloud-common/pom.xml +++ b/cloud-common/pom.xml @@ -20,6 +20,8 @@ cloud-common-system cloud-common-xxl cloud-common-rabbit + cloud-common-kafka + cloud-common-caffeine cloud-common diff --git a/pom.xml b/pom.xml index 7422784..b74c283 100644 --- a/pom.xml +++ b/pom.xml @@ -42,6 +42,9 @@ 5.8.27 4.1.0 2.4.1 + 1.3.1 + 2.2.8 + 2.9.3 @@ -266,6 +269,35 @@ cloud-common-rabbit ${muyu.version} + + + org.apache.iotdb + iotdb-session + ${iotdb.version} + + + + com.muyu + cloud-modules-data-process-common + ${muyu.version} + + + + io.swagger.core.v3 + swagger-annotations-jakarta + ${swagger.annotations.version} + + + + com.muyu + cloud-common-kafka + ${muyu.version} + + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} +