cloud-vehicles/muyu-analyze/src/main/java/com/muyu/analyze/utils/Test.java

97 lines
3.4 KiB
Java

//package com.muyu.analyze.utils;
//
//import lombok.extern.slf4j.Slf4j;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.consumer.ConsumerRecords;
//import org.apache.kafka.clients.consumer.KafkaConsumer;
//import org.apache.kafka.clients.producer.KafkaProducer;
//import org.apache.kafka.clients.producer.ProducerConfig;
//import org.apache.kafka.clients.producer.ProducerRecord;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//import java.time.Duration;
//import java.util.Collections;
//import java.util.Properties;
//
///**
// * @ProjectName: cloud-vehicles
// * @PackageName: com.muyu.analyze.utils
// * @Description TODO
// * @Author HuangDaJu
// * @Date 2024/4/8 12:41
// * @Version 1.0
// */
//@Component
//@Slf4j
//public class Test {
//
// private static final String TOPIC_NAME = "online";
// private static final String BOOTSTRAP_SERVERS = "10.10.26.4:9092";
//
// public static void main(String[] args) {
// //生产者示例
// produceMessage();
//
// //消费者示例
// consumerMessages();
//
// }
//
// //生产者
// @PostConstruct
// private static void produceMessage() {
// Properties props = new Properties();
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//
// KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// //创建生产者
// try {
//
// //发送消息
// for (int i = 0; i < 10000; i++) {
// String message = "佳佳来喽" + (i + 1);
// producer.send(new ProducerRecord<>(TOPIC_NAME, message));
//
// System.out.println("发送消息:" + message);
// }
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// producer.close();
// }
// }
//
// //消费者
// private static void consumerMessages() {
// Properties props = new Properties();
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//
// //创建消费者
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//
// try {
//
// //订阅主题
// consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//
// //持续消费消息
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// records.forEach(record -> {
// System.out.println("消费者接受到的消息值:" + record.value());
// });
// }
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// consumer.close();
// }
// }
//}