Merge remote-tracking branch 'origin/dev.dataProcessing' into dev

dev.eventProcess
xinzirun 2024-09-28 21:24:40 +08:00
commit 9c059750b6
7 changed files with 303 additions and 1 deletions

View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>cloud-common</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>cloud-common-kafka</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- 项目公共核心模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId>
</dependency>
<!-- kafka客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,102 @@
package com.muyu.common.kafka.config;
import com.muyu.common.core.text.StrFormatter;
import com.muyu.common.kafka.constant.KafkaConfigConstants;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @Author: zi run
* @Date 2024/9/28 20:32
* @Description Kafka
*/
@Configuration
public class KafkaConsumerConfig {
/**
* IP
*/
@Value("${kafka.consumer.bootstrap-servers-ip}")
private String bootstrapServersIP;
/**
*
*/
@Value("${kafka.consumer.bootstrap-servers-port}")
private String bootstrapServersPort;
/**
*
*/
@Value("${kafka.consumer.enable-auto-commit}")
private Boolean enableAutoCommit;
/**
*
*/
@Value("${kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
/**
*
*/
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
/**
*
*/
@Value("${kafka.consumer.fetch-max-wait}")
private Integer fetchMaxWait;
/**
*
*/
@Value("${kafka.consumer.fetch-min-size}")
private Integer fetchMinSize;
/**
*
*/
@Value("${kafka.consumer.heartbeat-interval}")
private Integer heartbeatInterval;
/**
* poll
*/
@Value("${kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
/**
*
*/
@Value("${kafka.consumer.group-id}")
private String groupId;
/**
* Kafka
* @return Kafka
*/
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
HashMap<String, Object> configs = new HashMap<>();
configs.put(KafkaConfigConstants.BOOTSTRAP_SERVERS,
StrFormatter.format("{}:{}", bootstrapServersIP, bootstrapServersPort));
configs.put(KafkaConfigConstants.ENABLE_AUTO_COMMIT, enableAutoCommit);
configs.put(KafkaConfigConstants.AUTO_COMMIT_INTERVAL, autoCommitInterval);
configs.put(KafkaConfigConstants.AUTO_OFFSET_RESET, autoOffsetReset);
configs.put(KafkaConfigConstants.FETCH_MAX_WAIT, fetchMaxWait);
configs.put(KafkaConfigConstants.FETCH_MIN_SIZE, fetchMinSize);
configs.put(KafkaConfigConstants.HEARTBEAT_INTERVAL, heartbeatInterval);
configs.put(KafkaConfigConstants.MAX_POLL_RECORDS, maxPollRecords);
configs.put(KafkaConfigConstants.GROUP_ID, groupId);
Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<String> valueDeserializer = new StringDeserializer();
return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer);
}
}

View File

@ -0,0 +1,74 @@
package com.muyu.common.kafka.config;
import com.muyu.common.core.text.StrFormatter;
import com.muyu.common.kafka.constant.KafkaConfigConstants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @Author: zi run
* @Date 2024/9/28 16:35
* @Description Kafka
*/
@Configuration
public class KafkaProducerConfig {
/**
* IP
*/
@Value("${kafka.producer.bootstrap-servers-ip}")
private String bootstrapServersIP;
/**
*
*/
@Value("${kafka.producer.bootstrap-servers-port}")
private String bootstrapServersPort;
/**
*
*/
@Value("${kafka.producer.retries}")
private Integer retries;
/**
*
*/
@Value("${kafka.producer.batch-size}")
private Integer batchSize;
/**
*
*/
@Value("${kafka.producer.buffer-memory}")
private Integer bufferMemory;
/**
*
*/
@Value("${kafka.producer.acks}")
private String acks;
/**
* Kafka
* @return kafka
*/
@Bean
public KafkaProducer<String, String> kafkaProducer() {
HashMap<String, Object> configs = new HashMap<>();
configs.put(KafkaConfigConstants.BOOTSTRAP_SERVERS,
StrFormatter.format("{}:{}", bootstrapServersIP, bootstrapServersPort));
configs.put(KafkaConfigConstants.RETRIES, retries);
configs.put(KafkaConfigConstants.BATCH_SIZE, batchSize);
configs.put(KafkaConfigConstants.BUFFER_MEMORY, bufferMemory);
configs.put(KafkaConfigConstants.ACKS, acks);
Serializer<String> keySerializer = new StringSerializer();
Serializer<String> valueSerializer = new StringSerializer();
return new KafkaProducer<>(configs, keySerializer, valueSerializer);
}
}

View File

@ -0,0 +1,74 @@
package com.muyu.common.kafka.constant;
/**
* @Author: zi run
* @Date 2024/9/28 20:07
* @Description Kafka
*/
public class KafkaConfigConstants {
/**
* ip+
*/
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
/**
*
*/
public static final String RETRIES = "retries";
/**
*
*/
public static final String BATCH_SIZE = "batch.size";
/**
*
*/
public static final String BUFFER_MEMORY = "buffer-memory";
/**
*
*/
public static final String ACKS = "acks";
/**
*
*/
public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
/**
*
*/
public static final String AUTO_COMMIT_INTERVAL = "auto.commit.interval";
/**
*
*/
public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
/**
*
*/
public static final String FETCH_MAX_WAIT = "fetch.max.wait";
/**
*
*/
public static final String FETCH_MIN_SIZE = "fetch.min.size";
/**
*
*/
public static final String HEARTBEAT_INTERVAL = "heartbeat-interval";
/**
* poll
*/
public static final String MAX_POLL_RECORDS = "max.poll.records";
/**
*
*/
public static final String GROUP_ID = "group.id";
}

View File

@ -0,0 +1,2 @@
com.muyu.common.kafka.config.KafkaProducerConfig
com.muyu.common.kafka.config.KafkaConsumerConfig

View File

@ -21,6 +21,7 @@
<module>cloud-common-system</module>
<module>cloud-common-xxl</module>
<module>cloud-common-rabbit</module>
<module>cloud-common-kafka</module>
</modules>
<artifactId>cloud-common</artifactId>

18
pom.xml
View File

@ -43,6 +43,7 @@
<knife4j-openapi3.version>4.1.0</knife4j-openapi3.version>
<xxl-job-core.version>2.4.1</xxl-job-core.version>
<swagger.an.jakarta.verison>2.2.8</swagger.an.jakarta.verison>
<kafka.clients.verison>3.0.0</kafka.clients.verison>
</properties>
<!-- 依赖声明 -->
@ -184,12 +185,20 @@
<version>${transmittable-thread-local.version}</version>
</dependency>
<!-- swagger3文档 -->
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations-jakarta</artifactId>
<version>${swagger.an.jakarta.verison}</version>
</dependency>
<!-- Kafka客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.verison}</version>
</dependency>
<!-- 核心模块 -->
<dependency>
<groupId>com.muyu</groupId>
@ -281,7 +290,14 @@
<version>${muyu.version}</version>
</dependency>
<!-- 企业业务平台 - 公告模块 -->
<!-- kafka模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
<version>${muyu.version}</version>
</dependency>
<!-- 企业业务平台 - 公共模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules-enterprise-common</artifactId>