feat 重构kafka的分区

master
rouchen 2024-06-18 21:32:10 +08:00
parent 669acc5537
commit 8493f8b817
10 changed files with 7776 additions and 45734 deletions

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,55 @@
package com.muyu.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
*
*/
@Component
public class CustomPartitioner implements Partitioner {
// Random random = new Random();
// topic = topic0 key = vin 轮询分区
private int currentPartitionIndex = 0;
private final int[] partitionWeights = {1, 1, 1, 1, 1, 1, 1, 1}; // 分区权重
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int numPartitions = partitionInfos.size();
int totalWeight = 0;
for (int weight : partitionWeights) {
totalWeight += weight;
}
synchronized (this) {
int chosenPartition = currentPartitionIndex;
currentPartitionIndex = (currentPartitionIndex + 1) % numPartitions;
// 根据权重选择分区
int cumulativeWeight = 0;
for (int i = 0; i < numPartitions; i++) {
cumulativeWeight += partitionWeights[i];
if (chosenPartition < cumulativeWeight) {
chosenPartition = i;
break;
}
}
System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + chosenPartition);
return chosenPartition;
}
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}

View File

@ -0,0 +1,98 @@
package com.muyu.kafka;
import com.alibaba.fastjson.JSON;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.*;
@Log4j2
@Service
public class KafkaPCUtils {
public void sendCallbackOneMessage(String topic, MessageData vehicle) {
String vin = vehicle.getVin();
String vehicleString = JSON.toJSONString(vehicle);
log.info("向主题:[{}],发送消息:{}", topic, vehicle);
String bootstrapServers = "127.0.0.1:9092";
// 设置生产者属性
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 创建分区
AdminClient adminClient = AdminClient.create(properties);
CreatePartitionsResult partitions = adminClient.createPartitions(new HashMap() {{
put(topic, NewPartitions.increaseTo(8, null));
}});
// 节点集合
List<Node> list = new ArrayList<>();
list.add(new Node(0, "127.0.0.1", 9092));
// 节点分区列表
List<PartitionInfo> partitionInfos = new ArrayList<>();
for (int i = 0; i < 8; i++) {
partitionInfos.add(new PartitionInfo(topic,i,null,null,null));
}
int partition = new CustomPartitioner().partition(
topic,
vin,
vin.getBytes(),
vehicleString,
vehicleString.getBytes(),
new Cluster(
"uqU0Vo8_TiaV2Xp0kzczaA",
list,
partitionInfos,
new HashSet<String>() {},
new HashSet<>())
);
log.info("当前获取分区:[{}]",partition);
// 创建消息 主题 key 分区 值
ProducerRecord<String, String> record = new ProducerRecord<>(topic,partition, vin, vehicleString);
record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8)));
// 发送消息 有回执
producer.send(record, (matadata, exception) -> {
if (exception == null) {
// int partition = matadata.partition();
log.info("消息发送成功,topic:[{}],分区为:[{}]", topic, partition);
} else {
log.info("消息发送失败,topic:[{}],异常信息:[{}]", topic, exception.getMessage());
}
});
// 关闭生产者
producer.close();
}
//监听消费
@KafkaListener(topics = {"test1"},groupId = "Topics")
public void onNormalMessage1(ConsumerRecord<String, Object> record) {
String value = (String) record.value();
JSON.parseObject(value, MessageData.class);
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value());
}
}

View File

@ -54,6 +54,23 @@ public class SimpleKafkaConsumer {
}
}
@KafkaListener(topics = "test2", groupId = "Topics")
public void consume2(ConsumerRecord<String, String> record) {
log.info("开始消费");
try {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
String value = record.value();
log.info("value:{}", value);
MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
log.info("messageData1:{}", messageData1);
iotDbServer.add(messageData1);
redisTemplate.opsForList().rightPush(messageData1.getVin(), JSON.toJSONString(messageData1));
} catch (Exception e) {
log.error("Error consuming Kafka message", e);
// 处理异常,可能需要重试或其他逻辑
}
}
}

View File

@ -0,0 +1,57 @@
package com.muyu.mqtt;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class CustomWeightedRoundRobinPartitioner implements Partitioner {
private int currentPartitionIndex = 0;
private final int[] partitionWeights;
public CustomWeightedRoundRobinPartitioner(int[] partitionWeights) {
this.partitionWeights = partitionWeights;
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
int numPartitions = partitions.size();
if (numPartitions == 0) {
throw new IllegalStateException("No partitions available for topic " + topic);
}
// 确保权重数组长度与分区数匹配
if (partitionWeights.length != numPartitions) {
throw new IllegalArgumentException("Partition weights length does not match number of partitions");
}
synchronized (CustomWeightedRoundRobinPartitioner.class) {
int totalWeight = 0;
for (int weight : partitionWeights) {
totalWeight += weight;
}
currentPartitionIndex = (currentPartitionIndex + 1) % totalWeight; // 轮询总权重
int chosenPartition = 0;
for (int i = 0; i < numPartitions; i++) {
if (currentPartitionIndex < partitionWeights[i]) {
chosenPartition = i;
break;
} else {
currentPartitionIndex -= partitionWeights[i];
}
}
return chosenPartition;
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}

View File

@ -6,7 +6,6 @@ import com.alibaba.fastjson.JSON;
import com.muyu.mqtt.dao.MessageData;
import com.muyu.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@ -15,9 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -40,28 +37,28 @@ public class MessageCallbackService implements MqttCallback {
System.out.println("connectionLost:"+cause.getMessage());
}
// AtomicInteger partitionCounter = new AtomicInteger(-1);
private static final AtomicInteger PARTITION_COUNTER = new AtomicInteger(0); // 假设NUM_PARTITIONS是已知的分区数量
private static final int NUM_PARTITIONS = 8; // 请根据实际情况设置分区数量
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info("topic:{}", topic);
log.info("Qos:{}", mqttMessage.getQos());
log.info("message content:{}", new String(mqttMessage.getPayload()));
String s = new String(mqttMessage.getPayload());
MessageData main = ConversionUtil.main(s);
// int numPartitions = 8;
// NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
// // 使用轮询计数器来选择分区,并确保它在分区数范围内
// int partition = partitionCounter.getAndUpdate(prev -> (prev + 1) % numPartitions);
// 准备ProducerRecord并发送到Kafka
String kafkaTopic = topic; // 假设MQTT主题与Kafka主题相同
String key = main.getVin(); // 使用vin作为key如果适用
String value = JSON.toJSONString(main);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopic, key, value);
kafkaTemplate.send(producerRecord);// 注意这里使用get()会阻塞直到发送完成,实际应用中可能需要异步处理
// 创建ProducerRecord并指定分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, null, main.getVin(), JSON.toJSONString(main));
// 发送消息
kafkaTemplate.send(producerRecord);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());

View File

@ -33,5 +33,23 @@ public class MsgHandler {
}
}
@RabbitListener(queues = "ip")
public void msg1(String msg){
log.info("接收到消息:{}",msg);
String[] split = msg.split(",");
for (String s : split) {
MqttProperties mqttProperties = MqttProperties.configBuild(
s,
"test2"
);
log.error("接收到消息初始化信息:{}",mqttProperties);
MqttClient mqttClient = mqttFactory.creatClient(mqttProperties);
log.error("client创建成功:{}",mqttClient.getClientId());
}
}
}

View File

@ -6,7 +6,6 @@ spring:
redis:
host: 101.34.243.166
port: 6379
password: yl030509
rabbitmq:
host: 101.34.243.166
port: 5672