feat():开始存入kafka--topic--分区

master
Saisai Liu 2024-06-10 15:57:04 +08:00
parent a5ec532f81
commit ae1a66ce02
9 changed files with 138 additions and 99 deletions

View File

@ -1,20 +1,21 @@
package com.mobai; package com.mobai;
import lombok.extern.java.Log; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.ProducerFactory;
/** /**
* @ClassName MqttApplication * @ClassName MqttApplication
* @Description * @Description
* @Author Mobai * @author Mobai
* @Date 2024/5/31 14:33 * @Date 2024/5/31 14:33
*/ */
@Log @Log4j2
@EnableKafka
@SpringBootApplication @SpringBootApplication
public class MqttApplication { public class MqttApplication {
@ -23,4 +24,5 @@ public class MqttApplication {
SpringApplication.run(MqttApplication.class,args); SpringApplication.run(MqttApplication.class,args);
} }
} }

View File

@ -5,6 +5,10 @@ import com.mobai.domian.Result;
import com.mobai.remote.MyClient; import com.mobai.remote.MyClient;
import com.mobai.util.ConnectMqtt; import com.mobai.util.ConnectMqtt;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
@ -12,7 +16,9 @@ import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Properties;
/** /**
* @ClassName MqttRunner * @ClassName MqttRunner

View File

@ -3,6 +3,7 @@ package com.mobai.cofig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.EnableKafka;
@ -19,11 +20,9 @@ import org.springframework.lang.Nullable;
@EnableKafka @EnableKafka
public class KafkaConfig { public class KafkaConfig {
@Autowired
ProducerFactory producerFactory;
@Bean @Bean
public KafkaTemplate<String, Object> kafkaTemplate() { public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory producerFactory) {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory); KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() { kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override @Override

View File

@ -0,0 +1,23 @@
//package com.mobai.controller;
//
//import com.dtflys.forest.annotation.Post;
//import org.apache.kafka.clients.admin.NewTopic;
//import org.springframework.web.bind.annotation.PostMapping;
//import org.springframework.web.bind.annotation.RestController;
//
///**
// * @author Mobai
// * @className KafkaController
// * @description 描述
// * @date 2024/6/10 11:22
// */
//@RestController
//public class KafkaController {
//
//
// @PostMapping("/set_topic_partition")
// public String setTopicPartition(String topicName, int partitionNum) {
// NewTopic newTopic = new NewTopic(topicName, partitionNum, (short) 1);
// return "set_topic_partition";
// }
//}

View File

@ -40,6 +40,7 @@ public class KafkaSimpleProducer {
record.partition(); record.partition();
// 发送消息 // 发送消息
producer.send(record); producer.send(record);
// 关闭生产者 // 关闭生产者
producer.close(); producer.close();
} }

View File

@ -3,6 +3,7 @@ package com.mobai.kafka;
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -10,21 +11,41 @@ import java.util.Map;
/** /**
* *
*/ */
@Component
public class CustomPartitioner implements Partitioner { public class CustomPartitioner implements Partitioner {
// Random random = new Random(); // Random random = new Random();
// topic = topic0 key = vin 轮询分区 // topic = topic0 key = vin 轮询分区
private int currentPartitionIndex = 0;
private final int[] partitionWeights = {1, 1, 1, 1, 1, 1, 1, 1}; // 分区权重
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取分区列表
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionNum = partitionInfos.size(); int numPartitions = partitionInfos.size();
if (partitionNum!=8){
partitionNum++; int totalWeight = 0;
}else { for (int weight : partitionWeights) {
partitionNum=1; totalWeight += weight;
} }
System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + partitionNum);
return partitionNum; synchronized (this) {
int chosenPartition = currentPartitionIndex;
currentPartitionIndex = (currentPartitionIndex + 1) % numPartitions;
// 根据权重选择分区
int cumulativeWeight = 0;
for (int i = 0; i < numPartitions; i++) {
cumulativeWeight += partitionWeights[i];
if (chosenPartition < cumulativeWeight) {
chosenPartition = i;
break;
} }
}
System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + chosenPartition);
return chosenPartition;
}
}
public void close() { public void close() {

View File

@ -4,24 +4,33 @@ import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
*/ */
public class CustomizePartitioner implements Partitioner { public class CustomizePartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override @Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { public void configure(Map<String, ?> configs) {
//自定义分区规则默认全部发送到0号分区 }
return 0;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题的分区数
int numPartitions = cluster.partitionCountForTopic(topic);
// 计算下一个分区
int partition = counter.getAndIncrement() % numPartitions;
// 如果计数器溢出重置为0
if (counter.get() == Integer.MAX_VALUE) {
counter.set(0);
}
return partition;
} }
@Override @Override
public void close() { public void close() {
}
@Override
public void configure(Map<String, ?> map) {
} }
} }

View File

@ -3,57 +3,87 @@ package com.mobai.kafka;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.mobai.domian.Vehicle; import com.mobai.domian.Vehicle;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties; import java.util.Properties;
/** /**
* / * /
*
* @author Mobai * @author Mobai
* @className KafkaPCUtils * @className KafkaPCUtils
* @description * @description
* @date 2024/6/7 10:14 * @date 2024/6/7 10:14
*/ */
@Log4j2 @Log4j2
@Component @Service
public class KafkaPCUtils { public class KafkaPCUtils {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendCallbackOneMessage(String topic, Vehicle vehicle) { public void sendCallbackOneMessage(String topic, Vehicle vehicle) {
String vin = vehicle.getVin();
String vehicleString = JSON.toJSONString(vehicle);
log.info("向主题:[{}],发送消息:{}", topic, vehicle); log.info("向主题:[{}],发送消息:{}", topic, vehicle);
String bootstrapServers = "127.0.0.1:9092"; String bootstrapServers = "127.0.0.1:9092";
String topicName = topic;
// 设置生产者属性 // 设置生产者属性
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class.getName());
// 创建生产者 // 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 创建消息 // 创建分区
// ProducerRecord<String, String> record = new ProducerRecord<>(topicName,vehicle.getVin(), vehicle.toString()); AdminClient adminClient = AdminClient.create(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, vehicle.getVin(), JSON.toJSONString(vehicle)); CreatePartitionsResult partitions = adminClient.createPartitions(new HashMap() {{
put(topic, NewPartitions.increaseTo(8, null));
}});
// 节点集合
List<Node> list = new ArrayList<>();
list.add(new Node(0, "127.0.0.1", 9092));
// 节点分区列表
List<PartitionInfo> partitionInfos = new ArrayList<>();
for (int i = 0; i < 8; i++) {
partitionInfos.add(new PartitionInfo(topic,i,null,null,null));
}
int partition = new CustomizePartitioner().partition(
topic,
vin,
vin.getBytes(),
vehicleString,
vehicleString.getBytes(),
new Cluster("iYl5vA6ESGaoH5veXYGroQ", list, partitionInfos, null, null)
);
log.info("当前获取分区:[{}]",partition);
// 创建消息 主题 key 分区 值
ProducerRecord<String, String> record = new ProducerRecord<>(topic, vin, vehicleString);
record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8))); record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8)));
// 只发送无回执
// producer.send(record);
// record.partition();
// 发送消息 有回执 // 发送消息 有回执
producer.send(record, (matadata, exception) -> { producer.send(record, (matadata, exception) -> {
if (exception == null) { if (exception == null) {
log.info("消息发送成功,topic:[{}]",topic); // int partition = matadata.partition();
log.info("消息发送成功,topic:[{}],分区为:[{}]", topic, partition);
} else { } else {
log.info("消息发送失败,topic:[{}],异常信息:[{}]", topic, exception.getMessage()); log.info("消息发送失败,topic:[{}],异常信息:[{}]", topic, exception.getMessage());
} }
@ -62,59 +92,6 @@ public class KafkaPCUtils {
producer.close(); producer.close();
} }
// public void sendCallbackOneMessage(String topic, Vehicle vehicle) {
// kafkaTemplate.send(topic,vehicle.getVin(), vehicle).addCallback(new SuccessCallback<SendResult<String, Object>>() {
// //成功的回调
// @Override
// public void onSuccess(SendResult<String, Object> success) {
// // 消息发送到的topic
// String topic = success.getRecordMetadata().topic();
// // 消息发送到的分区
// int partition = success.getRecordMetadata().partition();
// // 消息在分区内的offset
// long offset = success.getRecordMetadata().offset();
// System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
// }
// }, new FailureCallback() {
// //失败的回调
// @Override
// public void onFailure(Throwable throwable) {
// System.out.println("发送消息失败1:" + throwable.getMessage());
// }
// });
// }
// public void onNormalMessage(MqttServerModel mqttServerModel, String vin) {
// String bootstrapServers = "127.0.0.1:9092";
// String groupId = "test-group";
// String topic = "topic0";
//
// // 设置消费者属性
// Properties properties = new Properties();
// properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//
// // 创建消费者
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//
// // 订阅主题
// consumer.subscribe(Collections.singleton(topic));
//
// // 拉取并处理消息
// while(true) {
// ConsumerRecords<String, String> records = consumer.poll(2);
// for (ConsumerRecord<String, String> record : records) {
// System.out.printf("Received message: key = %s, value = %s, offset = %d%n",
// record.key(), record.value(), record.offset());
// }
// }
//
// // 注意:实际应用中需要合适的退出机制
// }
//监听消费 //监听消费
@KafkaListener(topics = {"topic0", "topic1"}) @KafkaListener(topics = {"topic0", "topic1"})
@ -125,7 +102,6 @@ public class KafkaPCUtils {
record.value()); record.value());
} }
//
// //批量消费 // //批量消费
// @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group") // @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group")
// public void onBatchMessage(List<ConsumerRecord<String, Object>> records) { // public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {

View File

@ -4,10 +4,13 @@ import com.mobai.domian.Vehicle;
//import com.mobai.kafka.KafkaPCUtils; //import com.mobai.kafka.KafkaPCUtils;
import com.mobai.kafka.KafkaPCUtils; import com.mobai.kafka.KafkaPCUtils;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.checkerframework.checker.units.qual.A;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.servlet.ServletException; import javax.servlet.ServletException;
@ -26,7 +29,6 @@ import static com.mobai.util.ConversionUtil.hexStringToString;
@Service @Service
public class MqttCallBackServiceImpl implements MqttCallback { public class MqttCallBackServiceImpl implements MqttCallback {
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
log.error(cause); log.error(cause);