feat():开始存入kafka--topic--分区

master
Saisai Liu 2024-06-07 22:32:32 +08:00
parent c3cd07a1e4
commit 0a6476d715
21 changed files with 448 additions and 63 deletions

View File

@ -102,7 +102,6 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
@ -110,6 +109,11 @@
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,8 +1,12 @@
package com.mobai;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @ClassName MqttApplication
@ -13,7 +17,10 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@Log
@SpringBootApplication
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class,args);
}
}

View File

@ -4,6 +4,7 @@ import com.mobai.domian.MqttServerModel;
import com.mobai.domian.Result;
import com.mobai.remote.MyClient;
import com.mobai.util.ConnectMqtt;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@ -19,6 +20,7 @@ import java.util.List;
* @Author Mobai
* @Date 2024/6/5 19:09
*/
@Log4j2
@Component
public class MqttRunner implements ApplicationRunner {
@Autowired
@ -34,7 +36,7 @@ public class MqttRunner implements ApplicationRunner {
if (ips.getData()==null){
throw new ServletException("获取初始节点信息失败");
}
System.out.println(ips);
log.info(ips);
List<MqttServerModel> mqttServerModels = ips.getData();
mqttServerModels.forEach(mqttServerModel -> {
try {

View File

@ -0,0 +1,53 @@
package com.mobai.cofig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
/**
*
*/
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
ProducerFactory producerFactory;
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
System.out.println("发送成功 " + producerRecord.toString());
}
// @Override
// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
// }
@Override
public void onError(ProducerRecord<String, Object> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
System.out.println("发送失败" + producerRecord.toString());
System.out.println(exception.getMessage());
}
// @Override
// public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
// System.out.println(exception.getMessage());
// }
});
return kafkaTemplate;
}
}

View File

@ -0,0 +1,37 @@
package com.mobai.cofig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-bootstrap-server");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new ErrorHandlingDeserializer<>(new StringDeserializer()), new ErrorHandlingDeserializer<>(new StringDeserializer()));
}
}

View File

@ -1,7 +1,7 @@
package com.mobai.cofig;
import com.mobai.domian.GetOptions;
import com.mobai.domian.MqttCallBackServiceImpl;
import com.mobai.service.impl.MqttCallBackServiceImpl;
import lombok.AllArgsConstructor;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@ -16,6 +16,9 @@ import org.springframework.stereotype.Service;
* @Date 2024/5/31 11:35
*/
/**
* mqtt
*/
@Service
@AllArgsConstructor
public class MqttFactory {

View File

@ -6,8 +6,8 @@ import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitConfig
* @Description
* @Author Mobai
* @description
* @author Mobai
* @Date 2024/5/31 21:47
*/
@Configuration

View File

@ -1,32 +0,0 @@
//package com.mobai.demo;
//
//import org.apache.kafka.clients.consumer.KafkaConsumer;
//
///**
// * @author Mobai
// * @className KafkaListen
// * @description 描述
// * @date 2024/6/6 15:44
// */
//public class KafkaListen{
//
//
// /**
// * 1.使用 spring-kafka.jar包中的 KafkaTemplate 类型
// * 使用 @KafkaListener 注解方式
// * 如下说明消费的是名称为test的topic下,分区 1 中的消息
// */
// @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = {"1"})})
// public void listen(String msg) {
//
// }
//
// /**
// * 2.使用kafka-clients.jar包中的 KafkaConsumer 类型
// * 如下说明消费的是名称为test的topic下,分区 1 中的消息
// */
// TopicPartition topicPartition = new TopicPartition("test", 1);
// KafkaConsumer consumer = new KafkaConsumer(props);
//consumer.assign(Arrays.asList(topicPartition));
//}
//

View File

@ -34,7 +34,7 @@ public class KafkaSimpleConsumer {
// 拉取并处理消息
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s, offset = %d%n",
record.key(), record.value(), record.offset());

View File

@ -4,14 +4,27 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import java.util.Properties;
import static io.lettuce.core.pubsub.PubSubOutput.Type.message;
/**
* demo
*/
@Component
public class KafkaSimpleProducer {
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
String topicName = "testTopic";
@ -20,16 +33,13 @@ public class KafkaSimpleProducer {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 创建消息
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "Hello, Kafka!");
record.partition();
// 发送消息
producer.send(record);
// 关闭生产者
producer.close();
}

View File

@ -1,4 +1,4 @@
package com.mobai.demo;
package com.mobai.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
@ -6,21 +6,21 @@ import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* 1.
*
*/
public class MyPartition implements Partitioner {
Random random = new Random();
public class CustomPartitioner implements Partitioner {
// Random random = new Random();
// topic = topic0 key = vin 轮询分区
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取分区列表
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionNum = 0;
if(key == null){
partitionNum = random.nextInt(partitionInfos.size());//随机分区
int partitionNum = partitionInfos.size();
if (partitionNum!=8){
partitionNum++;
}else {
partitionNum = Math.abs((key.hashCode())/partitionInfos.size());
partitionNum=1;
}
System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + partitionNum);
return partitionNum;

View File

@ -0,0 +1,27 @@
package com.mobai.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
*
*/
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//自定义分区规则默认全部发送到0号分区
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}

View File

@ -0,0 +1,32 @@
package com.mobai.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
*
*/
@Component
public class KafkaConsumer {
//监听消费
@KafkaListener(topics = {"sb_topic"})
public void onNormalMessage(ConsumerRecord<String, Object> record) {
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value());
}
//批量消费
@KafkaListener(id = "consumer2", topics = {"sb_topic","mobai-mq"}, groupId = "sb_group")
public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {
System.out.println(">>> 批量消费一次recoreds.size()=" + records.size());
for (ConsumerRecord<String, Object> record : records) {
System.out.println(record.value());
}
}
}

View File

@ -0,0 +1,135 @@
package com.mobai.kafka;
import com.mobai.domian.Vehicle;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* /
* @author Mobai
* @className KafkaPCUtils
* @description
* @date 2024/6/7 10:14
*/
@Log4j2
@Component
public class KafkaPCUtils {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendCallbackOneMessage(String topic, Vehicle vehicle) {
log.info("向主题:[{}],发送消息:{}",topic,vehicle);
String bootstrapServers = "127.0.0.1:9092";
String topicName = topic;
// 设置生产者属性
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者
KafkaProducer<String, Vehicle> producer = new KafkaProducer<>(properties);
// 创建消息
// ProducerRecord<String, String> record = new ProducerRecord<>(topicName,vehicle.getVin(), vehicle.toString());
ProducerRecord<String, Vehicle> record = new ProducerRecord<>("my_topic", "key", vehicle);
record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8)));
producer.send(record);
// record.partition();
// 发送消息
producer.send(record,(matadata,exception)->{
if (exception == null) {
log.info("消息发送成功,topic:[{}]",topic);
}else {
log.info("消息发送失败,topic:[{}],异常信息:[{}]",topic,exception.getMessage());
}
});
// 关闭生产者
producer.close();
log.info("发送成功topic【{}】",topic);
}
// public void sendCallbackOneMessage(String topic, Vehicle vehicle) {
// kafkaTemplate.send(topic,vehicle.getVin(), vehicle).addCallback(new SuccessCallback<SendResult<String, Object>>() {
// //成功的回调
// @Override
// public void onSuccess(SendResult<String, Object> success) {
// // 消息发送到的topic
// String topic = success.getRecordMetadata().topic();
// // 消息发送到的分区
// int partition = success.getRecordMetadata().partition();
// // 消息在分区内的offset
// long offset = success.getRecordMetadata().offset();
// System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
// }
// }, new FailureCallback() {
// //失败的回调
// @Override
// public void onFailure(Throwable throwable) {
// System.out.println("发送消息失败1:" + throwable.getMessage());
// }
// });
// }
// public void onNormalMessage(MqttServerModel mqttServerModel, String vin) {
// String bootstrapServers = "127.0.0.1:9092";
// String groupId = "test-group";
// String topic = "topic0";
//
// // 设置消费者属性
// Properties properties = new Properties();
// properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//
// // 创建消费者
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//
// // 订阅主题
// consumer.subscribe(Collections.singleton(topic));
//
// // 拉取并处理消息
// while(true) {
// ConsumerRecords<String, String> records = consumer.poll(2);
// for (ConsumerRecord<String, String> record : records) {
// System.out.printf("Received message: key = %s, value = %s, offset = %d%n",
// record.key(), record.value(), record.offset());
// }
// }
//
// // 注意:实际应用中需要合适的退出机制
// }
//监听消费
@KafkaListener(topics = {"topic0","topic1","sb_topic"})
public void onNormalMessage1(ConsumerRecord<String, Object> record) {
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value());
}
//
// //批量消费
// @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group")
// public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {
// System.out.println(">>> 批量消费一次recoreds.size()=" + records.size());
// for (ConsumerRecord<String, Object> record : records) {
// System.out.println(record.value());
// }
// }
}

View File

@ -0,0 +1,76 @@
package com.mobai.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
*
*/
@RestController
public class kafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/normal/{message}")
public void sendNormalMessage(@PathVariable("message") String message) {
kafkaTemplate.send("sb_topic", message);
}
/**
*
* @param message
*/
@GetMapping("/kafka/callbackOne/{message}")
public void sendCallbackOneMessage(@PathVariable("message") String message) {
kafkaTemplate.send("sb_topic", message).addCallback(new SuccessCallback<SendResult<String, Object>>() {
//成功的回调
@Override
public void onSuccess(SendResult<String, Object> success) {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
}
}, new FailureCallback() {
//失败的回调
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败1:" + throwable.getMessage());
}
});
}
/**
*
* @param message
*/
@GetMapping("/kafka/callbackTwo/{message}")
public void sendCallbackTwoMessage(@PathVariable("message") String message) {
kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败2"+throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功2" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
}

View File

@ -9,7 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* create.topic
* @ClassName MessageHandler
* @Description
* @Author Mobai

View File

@ -7,6 +7,9 @@ import org.springframework.stereotype.Component;
import java.util.List;
/**
* ips
*/
@Component
public interface MyClient {

View File

@ -1,10 +1,15 @@
package com.mobai.domian;
package com.mobai.service.impl;
import com.mobai.domian.Vehicle;
import com.mobai.kafka.KafkaPCUtils;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.servlet.ServletException;
import java.math.BigDecimal;
import static com.mobai.util.ConversionUtil.hexStringToString;
@ -16,10 +21,14 @@ import static com.mobai.util.ConversionUtil.hexStringToString;
* @Author Mobai
* @Date 2024/5/30 20:02
*/
@Log4j2
@Service
public class MqttCallBackServiceImpl implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
log.error(cause);
System.out.println("connectionLost: " + cause.getMessage());
}
@ -29,8 +38,12 @@ public class MqttCallBackServiceImpl implements MqttCallback {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
String hexStr = "7E 56 49 4e 31 32 33 34 35 36 37 38 39 44 49 4a 45 34 31 37 31 37 35 39 31 33 35 33 30 37 34 31 31 36 2e 37 32 31 34 31 37 30 33 39 2e 35 32 38 37 34 33 30 33 36 2e 30 30 30 32 36 2e 36 32 36 30 30 30 30 30 32 36 32 30 30 30 32 38 30 30 30 32 36 39 31 30 30 30 30 30 44 34 30 38 30 31 31 2e 32 30 37 30 30 30 30 30 37 36 32 32 32 35 37 37 30 31 33 39 30 30 30 31 35 31 30 30 31 33 37 38 35 30 30 30 34 34 37 37 30 2e 31 34 30 30 30 30 37 39 30 30 30 30 30 30 35 30 30 30 30 35 39 36 30 30 30 33 30 30 30 34 30 30 30 35 30 30 30 30 30 32 35 30 30 30 30 32 30 30 30 30 30 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 26 7E";
String hexStr = message.toString();
if (hexStr.length()<209) try {
throw new ServletException("报文信息位数不够");
} catch (ServletException e) {
throw new RuntimeException(e);
}
// 报文
String str = hexStringToString(hexStr);
System.out.println(hexStr);
@ -182,6 +195,8 @@ public class MqttCallBackServiceImpl implements MqttCallback {
"DCDC(电力交换系统)状态:" + dcdcStatus + "\n\t" +
"CHG(充电机)状态:" + chgStatus
);
KafkaPCUtils kafkaPCUtils = new KafkaPCUtils();
kafkaPCUtils.sendCallbackOneMessage(topic, vehicle);
}

View File

@ -2,7 +2,7 @@ package com.mobai.util;
import com.mobai.cofig.MqttFactory;
import com.mobai.cofig.MqttProperties;
import com.mobai.domian.MqttCallBackServiceImpl;
import com.mobai.service.impl.MqttCallBackServiceImpl;
import com.mobai.domian.MqttServerModel;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Autowired;
@ -11,6 +11,7 @@ import org.springframework.stereotype.Component;
import javax.servlet.ServletException;
/**
* mqtt
* @ClassName ConnectMqtt
* @description mqtt
* @author Mobai

View File

@ -4,14 +4,19 @@ spring:
application:
name: mobai-mq
rabbitmq:
host: 127.0.0.1
host: 43.142.100.73
stream:
username: guest
password: guest
# kafka 配置
kafka:
producer:
# Kafka服务器
bootstrap-servers: 127.0.0.1:9092
bootstrap:
servers: 127.0.0.1:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错
transaction-id-prefix: kafkaTx-
# transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。
retries: 3
# acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
@ -27,18 +32,22 @@ spring:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 值的序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
linger:
ms: 2000 # 延迟提交
partitioner: #指定分区器
class: com.mobai.kafka.CustomPartitioner # 分区器路径
consumer:
# Kafka服务器
bootstrap-servers: 127.0.0.1:9092
group-id: firstGroup
group-id: firstGroup #默认消费者组ID
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
#auto-commit-interval: 2s
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
# none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
auto-offset-reset: latest
auto-offset-reset: earliest
# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
@ -98,3 +107,6 @@ forest:
log-response-status: true # 打开/关闭Forest响应状态日志默认为 true
log-response-content: true # 打开/关闭Forest响应内容日志默认为 false
async-mode: platform # [自v1.5.27版本起可用] 异步模式(默认为 platform
logging:
level:
com.mobai.*: DEBUG