diff --git a/pom.xml b/pom.xml index a7a779c..562d828 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,17 @@ 2.0.46 + + + org.apache.kafka + kafka-clients + 3.7.0 + + + + org.springframework.kafka + spring-kafka + diff --git a/src/main/java/com/mobai/demo/KafkaListence.java b/src/main/java/com/mobai/demo/KafkaListence.java new file mode 100644 index 0000000..d684960 --- /dev/null +++ b/src/main/java/com/mobai/demo/KafkaListence.java @@ -0,0 +1,32 @@ +//package com.mobai.demo; +// +//import org.apache.kafka.clients.consumer.KafkaConsumer; +// +///** +// * @author Mobai +// * @className KafkaListen +// * @description 描述 +// * @date 2024/6/6 15:44 +// */ +//public class KafkaListen{ +// +// +// /** +// * 1.使用 spring-kafka.jar包中的 KafkaTemplate 类型 +// * 使用 @KafkaListener 注解方式 +// * 如下:说明消费的是名称为test的topic下,分区 1 中的消息 +// */ +// @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = {"1"})}) +// public void listen(String msg) { +// +// } +// +// /** +// * 2.使用kafka-clients.jar包中的 KafkaConsumer 类型 +// * 如下:说明消费的是名称为test的topic下,分区 1 中的消息 +// */ +// TopicPartition topicPartition = new TopicPartition("test", 1); +// KafkaConsumer consumer = new KafkaConsumer(props); +//consumer.assign(Arrays.asList(topicPartition)); +//} +// diff --git a/src/main/java/com/mobai/demo/KafkaSimpleConsumer.java b/src/main/java/com/mobai/demo/KafkaSimpleConsumer.java new file mode 100644 index 0000000..336253c --- /dev/null +++ b/src/main/java/com/mobai/demo/KafkaSimpleConsumer.java @@ -0,0 +1,46 @@ +package com.mobai.demo; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Collections; +import java.util.Properties; + +/** + * demo消费者 + */ +public class KafkaSimpleConsumer { + public static void main(String[] args) { + String bootstrapServers = "127.0.0.1:9092"; + String groupId = "test-group"; + String topic = "testTopic"; + + // 设置消费者属性 + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // 创建消费者 + KafkaConsumer consumer = new KafkaConsumer<>(properties); + + // 订阅主题 + consumer.subscribe(Collections.singleton(topic)); + + // 拉取并处理消息 + while(true) { + ConsumerRecords records = consumer.poll(100); + for (ConsumerRecord record : records) { + System.out.printf("Received message: key = %s, value = %s, offset = %d%n", + record.key(), record.value(), record.offset()); + } + } + + // 注意:实际应用中需要合适的退出机制 + } +} diff --git a/src/main/java/com/mobai/demo/KafkaSimpleProducer.java b/src/main/java/com/mobai/demo/KafkaSimpleProducer.java new file mode 100644 index 0000000..c4d0a8e --- /dev/null +++ b/src/main/java/com/mobai/demo/KafkaSimpleProducer.java @@ -0,0 +1,36 @@ +package com.mobai.demo; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +/** + * demo生产者 + */ +public class KafkaSimpleProducer { + public static void main(String[] args) { + String bootstrapServers = "127.0.0.1:9092"; + String topicName = "testTopic"; + + // 设置生产者属性 + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + // 创建生产者 + KafkaProducer producer = new KafkaProducer<>(properties); + + // 创建消息 + ProducerRecord record = new ProducerRecord<>(topicName, "Hello, Kafka!"); + + // 发送消息 + producer.send(record); + + // 关闭生产者 + producer.close(); + } +} diff --git a/src/main/java/com/mobai/demo/MyPartition.java b/src/main/java/com/mobai/demo/MyPartition.java new file mode 100644 index 0000000..2856929 --- /dev/null +++ b/src/main/java/com/mobai/demo/MyPartition.java @@ -0,0 +1,36 @@ +package com.mobai.demo; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; + +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * 1.自定义分区策略 + */ +public class MyPartition implements Partitioner { + Random random = new Random(); + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + //获取分区列表 + List partitionInfos = cluster.partitionsForTopic(topic); + int partitionNum = 0; + if(key == null){ + partitionNum = random.nextInt(partitionInfos.size());//随机分区 + } else { + partitionNum = Math.abs((key.hashCode())/partitionInfos.size()); + } + System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + partitionNum); + return partitionNum; + } + + public void close() { + + } + + public void configure(Map map) { + + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4a40315..99486a6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,7 +4,82 @@ spring: application: name: mobai-mq rabbitmq: - host: 43.142.100.73 + host: 127.0.0.1 + # kafka 配置 + kafka: + producer: + # Kafka服务器 + bootstrap-servers: 127.0.0.1:9092 + # 开启事务,必须在开启了事务的方法中发送,否则报错 + transaction-id-prefix: kafkaTx- + # 发生错误后,消息重发的次数,开启事务必须设置大于0。 + retries: 3 + # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + # 开启事务时,必须设置为all + acks: all + # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 + batch-size: 16384 + # 生产者内存缓冲区的大小。 + buffer-memory: 1024000 + # 键的序列化方式 + key-serializer: org.springframework.kafka.support.serializer.JsonSerializer + # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + + consumer: + # Kafka服务器 + bootstrap-servers: 127.0.0.1:9092 + group-id: firstGroup + # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D + #auto-commit-interval: 2s + # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 + # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) + # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + auto-offset-reset: latest + # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 + enable-auto-commit: false + # 键的反序列化方式 + #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 + properties: + spring: + json: + trusted: + packages: "*" + # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 + # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, + # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, + # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 + # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 + # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 + max-poll-records: 3 + properties: + # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + max: + poll: + interval: + ms: 600000 + # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s + session: + timeout: + ms: 10000 + listener: + # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数 + concurrency: 4 + # 自动提交关闭,需要设置手动消息确认 + ack-mode: manual_immediate + # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 + missing-topics-fatal: false + # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + poll-timeout: 600000 + + # forest配置 forest: backend: okhttp3 # 后端HTTP框架(默认为 okhttp3)