From 54a091e3b6dae2db2cd950192e2c3494427a5f27 Mon Sep 17 00:00:00 2001 From: dongxiaodong <13970843+dxdwork@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 09:57:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD=E8=AF=AD?= =?UTF-8?q?=E5=8F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../analyze/msg/model/ModelsKafkaMessage.java | 5 +- .../com/couplet/mq/controller/KafkaTest.java | 164 +++++++++--------- 2 files changed, 84 insertions(+), 85 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 934e710..d9782d4 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -76,11 +76,10 @@ public class ModelsKafkaMessage { * * @return */ - @Scheduled(fixedDelay = 50) +// @Scheduled(fixedDelay = 50) + @PostConstruct public void consumerMessages() { executorService.execute(this::consumer); - - } public void consumer() { diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java index d078386..5ff0ec6 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java @@ -1,93 +1,93 @@ -package com.couplet.mq.controller; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; - -import javax.annotation.PostConstruct; -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: kafka测试类 - */ - -@Slf4j -public class KafkaTest { - private static final String TOPIC_NAME = "online"; - private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; - - public static void main(String[] args) { - //生产者示例 -// produceMessage(); - - //消费者示例 -// consumerMessages(); - - } - - //生产者 - @PostConstruct - private static void produceMessage() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - - KafkaProducer producer = new KafkaProducer<>(props); - //创建生产者 - try { - - //发送消息 - for (int i = 0; i < 10000; i++) { - String message = "佳佳来喽" + (i + 1); - producer.send(new ProducerRecord<>(TOPIC_NAME, message)); - - System.out.println("发送消息:" + message); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - producer.close(); - } - } - - //消费者 -// private static void consumerMessages() { +//package com.couplet.mq.controller; +// +//import lombok.extern.slf4j.Slf4j; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.clients.producer.ProducerRecord; +// +//import javax.annotation.PostConstruct; +//import java.time.Duration; +//import java.util.Collections; +//import java.util.Properties; +// +///** +// * @ProjectName: five-groups-couplet +// * @Author: LiuYunHu +// * @CreateTime: 2024/4/4 +// * @Description: kafka测试类 +// */ +// +//@Slf4j +//public class KafkaTest { +// private static final String TOPIC_NAME = "online"; +// private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; +// +// public static void main(String[] args) { +// //生产者示例 +//// produceMessage(); +// +// //消费者示例 +//// consumerMessages(); +// +// } +// +// //生产者 +// @PostConstruct +// private static void produceMessage() { // Properties props = new Properties(); -// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); -// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); -// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); -// -// //创建消费者 -// KafkaConsumer consumer = new KafkaConsumer<>(props); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // +// KafkaProducer producer = new KafkaProducer<>(props); +// //创建生产者 // try { // -// //订阅主题 -// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); +// //发送消息 +// for (int i = 0; i < 10000; i++) { +// String message = "佳佳来喽" + (i + 1); +// producer.send(new ProducerRecord<>(TOPIC_NAME, message)); // -// //持续消费消息 -// while (true) { -// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); -// records.forEach(record -> { -// System.out.println("消费者接受到的消息值:" + record.value()); -// }); +// System.out.println("发送消息:" + message); // } // } catch (Exception e) { // e.printStackTrace(); // } finally { -// consumer.close(); +// producer.close(); // } // } - -} +// +// //消费者 +//// private static void consumerMessages() { +//// Properties props = new Properties(); +//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); +//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// +//// //创建消费者 +//// KafkaConsumer consumer = new KafkaConsumer<>(props); +//// +//// try { +//// +//// //订阅主题 +//// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); +//// +//// //持续消费消息 +//// while (true) { +//// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); +//// records.forEach(record -> { +//// System.out.println("消费者接受到的消息值:" + record.value()); +//// }); +//// } +//// } catch (Exception e) { +//// e.printStackTrace(); +//// } finally { +//// consumer.close(); +//// } +//// } +// +//}