feat():开始存入kafka--topic--分区

master
Saisai Liu 2024-06-07 08:54:35 +08:00
parent 92f486ebb0
commit c3cd07a1e4
6 changed files with 237 additions and 1 deletions

11
pom.xml
View File

@ -98,6 +98,17 @@
<version>2.0.46</version>
</dependency>
<!-- kafka依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

View File

@ -0,0 +1,32 @@
//package com.mobai.demo;
//
//import org.apache.kafka.clients.consumer.KafkaConsumer;
//
///**
// * @author Mobai
// * @className KafkaListen
// * @description 描述
// * @date 2024/6/6 15:44
// */
//public class KafkaListen{
//
//
// /**
// * 1.使用 spring-kafka.jar包中的 KafkaTemplate 类型
// * 使用 @KafkaListener 注解方式
// * 如下说明消费的是名称为test的topic下,分区 1 中的消息
// */
// @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = {"1"})})
// public void listen(String msg) {
//
// }
//
// /**
// * 2.使用kafka-clients.jar包中的 KafkaConsumer 类型
// * 如下说明消费的是名称为test的topic下,分区 1 中的消息
// */
// TopicPartition topicPartition = new TopicPartition("test", 1);
// KafkaConsumer consumer = new KafkaConsumer(props);
//consumer.assign(Arrays.asList(topicPartition));
//}
//

View File

@ -0,0 +1,46 @@
package com.mobai.demo;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
/**
* demo
*/
public class KafkaSimpleConsumer {
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
String groupId = "test-group";
String topic = "testTopic";
// 设置消费者属性
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singleton(topic));
// 拉取并处理消息
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s, offset = %d%n",
record.key(), record.value(), record.offset());
}
}
// 注意:实际应用中需要合适的退出机制
}
}

View File

@ -0,0 +1,36 @@
package com.mobai.demo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* demo
*/
public class KafkaSimpleProducer {
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
String topicName = "testTopic";
// 设置生产者属性
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 创建消息
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "Hello, Kafka!");
// 发送消息
producer.send(record);
// 关闭生产者
producer.close();
}
}

View File

@ -0,0 +1,36 @@
package com.mobai.demo;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* 1.
*/
public class MyPartition implements Partitioner {
Random random = new Random();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取分区列表
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionNum = 0;
if(key == null){
partitionNum = random.nextInt(partitionInfos.size());//随机分区
} else {
partitionNum = Math.abs((key.hashCode())/partitionInfos.size());
}
System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + partitionNum);
return partitionNum;
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}

View File

@ -4,7 +4,82 @@ spring:
application:
name: mobai-mq
rabbitmq:
host: 43.142.100.73
host: 127.0.0.1
# kafka 配置
kafka:
producer:
# Kafka服务器
bootstrap-servers: 127.0.0.1:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错
transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。
retries: 3
# acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
# 开启事务时必须设置为all
acks: all
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 生产者内存缓冲区的大小。
buffer-memory: 1024000
# 键的序列化方式
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 值的序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
# Kafka服务器
bootstrap-servers: 127.0.0.1:9092
group-id: firstGroup
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
#auto-commit-interval: 2s
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
# none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
auto-offset-reset: latest
# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
#key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 值的反序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
properties:
spring:
json:
trusted:
packages: "*"
# 这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
# 这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 3
properties:
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
max:
poll:
interval:
ms: 600000
# 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
session:
timeout:
ms: 10000
listener:
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
missing-topics-fatal: false
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000
# forest配置
forest:
backend: okhttp3 # 后端HTTP框架默认为 okhttp3