diff --git a/src/main/java/com/mobai/MqttApplication.java b/src/main/java/com/mobai/MqttApplication.java index 3a48f45..6420f1d 100644 --- a/src/main/java/com/mobai/MqttApplication.java +++ b/src/main/java/com/mobai/MqttApplication.java @@ -1,20 +1,21 @@ package com.mobai; -import lombok.extern.java.Log; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.extern.log4j.Log4j2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; /** * @ClassName MqttApplication * @Description 描述 - * @Author Mobai + * @author Mobai * @Date 2024/5/31 14:33 */ -@Log +@Log4j2 +@EnableKafka @SpringBootApplication public class MqttApplication { @@ -23,4 +24,5 @@ public class MqttApplication { SpringApplication.run(MqttApplication.class,args); } + } diff --git a/src/main/java/com/mobai/MqttRunner.java b/src/main/java/com/mobai/MqttRunner.java index 8abca84..e387062 100644 --- a/src/main/java/com/mobai/MqttRunner.java +++ b/src/main/java/com/mobai/MqttRunner.java @@ -5,6 +5,10 @@ import com.mobai.domian.Result; import com.mobai.remote.MyClient; import com.mobai.util.ConnectMqtt; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.common.PartitionInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -12,7 +16,9 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.servlet.ServletException; +import java.util.HashMap; import java.util.List; +import java.util.Properties; /** * @ClassName MqttRunner diff --git a/src/main/java/com/mobai/cofig/KafkaConfig.java b/src/main/java/com/mobai/cofig/KafkaConfig.java index bbb8711..69d9996 100644 --- a/src/main/java/com/mobai/cofig/KafkaConfig.java +++ b/src/main/java/com/mobai/cofig/KafkaConfig.java @@ -3,6 +3,7 @@ package com.mobai.cofig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; @@ -19,11 +20,9 @@ import org.springframework.lang.Nullable; @EnableKafka public class KafkaConfig { - @Autowired - ProducerFactory producerFactory; @Bean - public KafkaTemplate kafkaTemplate() { + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory); kafkaTemplate.setProducerListener(new ProducerListener() { @Override diff --git a/src/main/java/com/mobai/controller/KafkaController.java b/src/main/java/com/mobai/controller/KafkaController.java new file mode 100644 index 0000000..d9b3d50 --- /dev/null +++ b/src/main/java/com/mobai/controller/KafkaController.java @@ -0,0 +1,23 @@ +//package com.mobai.controller; +// +//import com.dtflys.forest.annotation.Post; +//import org.apache.kafka.clients.admin.NewTopic; +//import org.springframework.web.bind.annotation.PostMapping; +//import org.springframework.web.bind.annotation.RestController; +// +///** +// * @author Mobai +// * @className KafkaController +// * @description 描述 +// * @date 2024/6/10 11:22 +// */ +//@RestController +//public class KafkaController { +// +// +// @PostMapping("/set_topic_partition") +// public String setTopicPartition(String topicName, int partitionNum) { +// NewTopic newTopic = new NewTopic(topicName, partitionNum, (short) 1); +// return "set_topic_partition"; +// } +//} diff --git a/src/main/java/com/mobai/demo/KafkaSimpleProducer.java b/src/main/java/com/mobai/demo/KafkaSimpleProducer.java index 9ac656f..468d040 100644 --- a/src/main/java/com/mobai/demo/KafkaSimpleProducer.java +++ b/src/main/java/com/mobai/demo/KafkaSimpleProducer.java @@ -40,6 +40,7 @@ public class KafkaSimpleProducer { record.partition(); // 发送消息 producer.send(record); + // 关闭生产者 producer.close(); } diff --git a/src/main/java/com/mobai/kafka/CustomPartitioner.java b/src/main/java/com/mobai/kafka/CustomPartitioner.java index 4341758..26fa315 100644 --- a/src/main/java/com/mobai/kafka/CustomPartitioner.java +++ b/src/main/java/com/mobai/kafka/CustomPartitioner.java @@ -3,6 +3,7 @@ package com.mobai.kafka; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; +import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; @@ -10,22 +11,42 @@ import java.util.Map; /** * 自定义分区策略 */ +@Component public class CustomPartitioner implements Partitioner { // Random random = new Random(); // topic = topic0 key = vin 轮询分区 + private int currentPartitionIndex = 0; + private final int[] partitionWeights = {1, 1, 1, 1, 1, 1, 1, 1}; // 分区权重 + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - //获取分区列表 List partitionInfos = cluster.partitionsForTopic(topic); - int partitionNum = partitionInfos.size(); - if (partitionNum!=8){ - partitionNum++; - }else { - partitionNum=1; + int numPartitions = partitionInfos.size(); + + int totalWeight = 0; + for (int weight : partitionWeights) { + totalWeight += weight; + } + + synchronized (this) { + int chosenPartition = currentPartitionIndex; + currentPartitionIndex = (currentPartitionIndex + 1) % numPartitions; + + // 根据权重选择分区 + int cumulativeWeight = 0; + for (int i = 0; i < numPartitions; i++) { + cumulativeWeight += partitionWeights[i]; + if (chosenPartition < cumulativeWeight) { + chosenPartition = i; + break; + } + } + + System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + chosenPartition); + return chosenPartition; } - System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + partitionNum); - return partitionNum; } + public void close() { } diff --git a/src/main/java/com/mobai/kafka/CustomizePartitioner.java b/src/main/java/com/mobai/kafka/CustomizePartitioner.java index 1620f35..c6b4dc8 100644 --- a/src/main/java/com/mobai/kafka/CustomizePartitioner.java +++ b/src/main/java/com/mobai/kafka/CustomizePartitioner.java @@ -4,24 +4,33 @@ import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * 分区器 */ public class CustomizePartitioner implements Partitioner { + private final AtomicInteger counter = new AtomicInteger(0); + @Override - public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { - //自定义分区规则,默认全部发送到0号分区 - return 0; + public void configure(Map configs) { + } + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + // 获取主题的分区数 + int numPartitions = cluster.partitionCountForTopic(topic); + // 计算下一个分区 + int partition = counter.getAndIncrement() % numPartitions; + // 如果计数器溢出,重置为0 + if (counter.get() == Integer.MAX_VALUE) { + counter.set(0); + } + return partition; } @Override public void close() { - - } - - @Override - public void configure(Map map) { - } } + diff --git a/src/main/java/com/mobai/kafka/KafkaPCUtils.java b/src/main/java/com/mobai/kafka/KafkaPCUtils.java index 1af26c9..fb7454c 100644 --- a/src/main/java/com/mobai/kafka/KafkaPCUtils.java +++ b/src/main/java/com/mobai/kafka/KafkaPCUtils.java @@ -3,121 +3,98 @@ package com.mobai.kafka; import com.alibaba.fastjson.JSON; import com.mobai.domian.Vehicle; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreatePartitionsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Properties; /** * 生产者/消费者 + * * @author Mobai * @className KafkaPCUtils * @description 描述 * @date 2024/6/7 10:14 */ @Log4j2 -@Component +@Service public class KafkaPCUtils { - @Autowired - private KafkaTemplate kafkaTemplate; - public void sendCallbackOneMessage(String topic, Vehicle vehicle) { - log.info("向主题:[{}],发送消息:{}",topic,vehicle); + String vin = vehicle.getVin(); + String vehicleString = JSON.toJSONString(vehicle); + log.info("向主题:[{}],发送消息:{}", topic, vehicle); String bootstrapServers = "127.0.0.1:9092"; - String topicName = topic; // 设置生产者属性 Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); +// properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class.getName()); // 创建生产者 KafkaProducer producer = new KafkaProducer<>(properties); - // 创建消息 -// ProducerRecord record = new ProducerRecord<>(topicName,vehicle.getVin(), vehicle.toString()); - ProducerRecord record = new ProducerRecord<>(topic, vehicle.getVin(), JSON.toJSONString(vehicle)); + // 创建分区 + AdminClient adminClient = AdminClient.create(properties); + CreatePartitionsResult partitions = adminClient.createPartitions(new HashMap() {{ + put(topic, NewPartitions.increaseTo(8, null)); + }}); + // 节点集合 + List list = new ArrayList<>(); + list.add(new Node(0, "127.0.0.1", 9092)); + // 节点分区列表 + List partitionInfos = new ArrayList<>(); + for (int i = 0; i < 8; i++) { + partitionInfos.add(new PartitionInfo(topic,i,null,null,null)); + } + + int partition = new CustomizePartitioner().partition( + topic, + vin, + vin.getBytes(), + vehicleString, + vehicleString.getBytes(), + new Cluster("iYl5vA6ESGaoH5veXYGroQ", list, partitionInfos, null, null) + ); + log.info("当前获取分区:[{}]",partition); + // 创建消息 主题 key 分区 值 + ProducerRecord record = new ProducerRecord<>(topic, vin, vehicleString); record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8))); - // 只发送无回执 -// producer.send(record); -// record.partition(); + // 发送消息 有回执 - producer.send(record,(matadata,exception)->{ + producer.send(record, (matadata, exception) -> { if (exception == null) { - log.info("消息发送成功,topic:[{}]",topic); - }else { - log.info("消息发送失败,topic:[{}],异常信息:[{}]",topic,exception.getMessage()); +// int partition = matadata.partition(); + log.info("消息发送成功,topic:[{}],分区为:[{}]", topic, partition); + + } else { + log.info("消息发送失败,topic:[{}],异常信息:[{}]", topic, exception.getMessage()); } }); // 关闭生产者 producer.close(); } -// public void sendCallbackOneMessage(String topic, Vehicle vehicle) { -// kafkaTemplate.send(topic,vehicle.getVin(), vehicle).addCallback(new SuccessCallback>() { -// //成功的回调 -// @Override -// public void onSuccess(SendResult success) { -// // 消息发送到的topic -// String topic = success.getRecordMetadata().topic(); -// // 消息发送到的分区 -// int partition = success.getRecordMetadata().partition(); -// // 消息在分区内的offset -// long offset = success.getRecordMetadata().offset(); -// System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); -// } -// }, new FailureCallback() { -// //失败的回调 -// @Override -// public void onFailure(Throwable throwable) { -// System.out.println("发送消息失败1:" + throwable.getMessage()); -// } -// }); -// } - - -// public void onNormalMessage(MqttServerModel mqttServerModel, String vin) { -// String bootstrapServers = "127.0.0.1:9092"; -// String groupId = "test-group"; -// String topic = "topic0"; -// -// // 设置消费者属性 -// Properties properties = new Properties(); -// properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); -// properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); -// properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); -// -// // 创建消费者 -// KafkaConsumer consumer = new KafkaConsumer<>(properties); -// -// // 订阅主题 -// consumer.subscribe(Collections.singleton(topic)); -// -// // 拉取并处理消息 -// while(true) { -// ConsumerRecords records = consumer.poll(2); -// for (ConsumerRecord record : records) { -// System.out.printf("Received message: key = %s, value = %s, offset = %d%n", -// record.key(), record.value(), record.offset()); -// } -// } -// -// // 注意:实际应用中需要合适的退出机制 -// } //监听消费 - @KafkaListener(topics = {"topic0","topic1"}) + @KafkaListener(topics = {"topic0", "topic1"}) public void onNormalMessage1(ConsumerRecord record) { String value = (String) record.value(); JSON.parseObject(value, Vehicle.class); @@ -125,7 +102,6 @@ public class KafkaPCUtils { record.value()); } -// // //批量消费 // @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group") // public void onBatchMessage(List> records) { diff --git a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java index 377f221..adcc088 100644 --- a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java +++ b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java @@ -4,10 +4,13 @@ import com.mobai.domian.Vehicle; //import com.mobai.kafka.KafkaPCUtils; import com.mobai.kafka.KafkaPCUtils; import lombok.extern.log4j.Log4j2; +import org.checkerframework.checker.units.qual.A; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.stereotype.Service; import javax.servlet.ServletException; @@ -26,7 +29,6 @@ import static com.mobai.util.ConversionUtil.hexStringToString; @Service public class MqttCallBackServiceImpl implements MqttCallback { - @Override public void connectionLost(Throwable cause) { log.error(cause);