package com.mobai.kafka; import com.alibaba.fastjson.JSON; import com.mobai.domian.Vehicle; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Properties; /** * 生产者/消费者 * @author Mobai * @className KafkaPCUtils * @description 描述 * @date 2024/6/7 10:14 */ @Log4j2 @Component public class KafkaPCUtils { @Autowired private KafkaTemplate kafkaTemplate; public void sendCallbackOneMessage(String topic, Vehicle vehicle) { log.info("向主题:[{}],发送消息:{}",topic,vehicle); String bootstrapServers = "127.0.0.1:9092"; String topicName = topic; // 设置生产者属性 Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建生产者 KafkaProducer producer = new KafkaProducer<>(properties); // 创建消息 // ProducerRecord record = new ProducerRecord<>(topicName,vehicle.getVin(), vehicle.toString()); ProducerRecord record = new ProducerRecord<>(topic, vehicle.getVin(), JSON.toJSONString(vehicle)); record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8))); // 只发送无回执 // producer.send(record); // record.partition(); // 发送消息 有回执 producer.send(record,(matadata,exception)->{ if (exception == null) { log.info("消息发送成功,topic:[{}]",topic); }else { log.info("消息发送失败,topic:[{}],异常信息:[{}]",topic,exception.getMessage()); } }); // 关闭生产者 producer.close(); } // public void sendCallbackOneMessage(String topic, Vehicle vehicle) { // kafkaTemplate.send(topic,vehicle.getVin(), vehicle).addCallback(new SuccessCallback>() { // //成功的回调 // @Override // public void onSuccess(SendResult success) { // // 消息发送到的topic // String topic = success.getRecordMetadata().topic(); // // 消息发送到的分区 // int partition = success.getRecordMetadata().partition(); // // 消息在分区内的offset // long offset = success.getRecordMetadata().offset(); // System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); // } // }, new FailureCallback() { // //失败的回调 // @Override // public void onFailure(Throwable throwable) { // System.out.println("发送消息失败1:" + throwable.getMessage()); // } // }); // } // public void onNormalMessage(MqttServerModel mqttServerModel, String vin) { // String bootstrapServers = "127.0.0.1:9092"; // String groupId = "test-group"; // String topic = "topic0"; // // // 设置消费者属性 // Properties properties = new Properties(); // properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); // properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // // // 创建消费者 // KafkaConsumer consumer = new KafkaConsumer<>(properties); // // // 订阅主题 // consumer.subscribe(Collections.singleton(topic)); // // // 拉取并处理消息 // while(true) { // ConsumerRecords records = consumer.poll(2); // for (ConsumerRecord record : records) { // System.out.printf("Received message: key = %s, value = %s, offset = %d%n", // record.key(), record.value(), record.offset()); // } // } // // // 注意:实际应用中需要合适的退出机制 // } //监听消费 @KafkaListener(topics = {"topic0","topic1"}) public void onNormalMessage1(ConsumerRecord record) { String value = (String) record.value(); JSON.parseObject(value, Vehicle.class); System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + record.value()); } // // //批量消费 // @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group") // public void onBatchMessage(List> records) { // System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size()); // for (ConsumerRecord record : records) { // System.out.println(record.value()); // } // } }