diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 8e64ae3..0a40e9a 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -36,7 +36,7 @@ import static java.lang.Thread.sleep; @Component @Slf4j public class ModelsKafkaMessage { - private static final String TOPIC_NAME = "xiaoYao"; + private static final String TOPIC_NAME = "online"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; @@ -56,7 +56,7 @@ public class ModelsKafkaMessage { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "xiaoYao"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "ddd"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -76,8 +76,7 @@ public class ModelsKafkaMessage { // @Scheduled(fixedDelay = 50) @PostConstruct public void consumerMessages() { -// executorService.execute(this::consumer); - this.consumer(); + executorService.execute(this::consumer); } public void consumer() { diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java index a0f255a..5ff0ec6 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java @@ -1,93 +1,93 @@ -package com.couplet.mq.controller; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; - -import javax.annotation.PostConstruct; -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * @ProjectName: five-groups-couplet - * @Author: LiuYunHu - * @CreateTime: 2024/4/4 - * @Description: kafka测试类 - */ - -@Slf4j -public class KafkaTest { - private static final String TOPIC_NAME = "online"; - private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; - - public static void main(String[] args) { - //生产者示例 - produceMessage(); - - //消费者示例 - consumerMessages(); - - } - - //生产者 - @PostConstruct - private static void produceMessage() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - - KafkaProducer producer = new KafkaProducer<>(props); - //创建生产者 - try { - - //发送消息 - for (int i = 0; i < 10000; i++) { - String message = "佳佳来喽" + (i + 1); - producer.send(new ProducerRecord<>(TOPIC_NAME, message)); - - System.out.println("发送消息:" + message); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - producer.close(); - } - } - - //消费者 - private static void consumerMessages() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - - //创建消费者 - KafkaConsumer consumer = new KafkaConsumer<>(props); - - try { - - //订阅主题 - consumer.subscribe(Collections.singletonList(TOPIC_NAME)); - - //持续消费消息 - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - records.forEach(record -> { - System.out.println("消费者接受到的消息值:" + record.value()); - }); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - consumer.close(); - } - } - -} +//package com.couplet.mq.controller; +// +//import lombok.extern.slf4j.Slf4j; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.clients.producer.ProducerRecord; +// +//import javax.annotation.PostConstruct; +//import java.time.Duration; +//import java.util.Collections; +//import java.util.Properties; +// +///** +// * @ProjectName: five-groups-couplet +// * @Author: LiuYunHu +// * @CreateTime: 2024/4/4 +// * @Description: kafka测试类 +// */ +// +//@Slf4j +//public class KafkaTest { +// private static final String TOPIC_NAME = "online"; +// private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; +// +// public static void main(String[] args) { +// //生产者示例 +//// produceMessage(); +// +// //消费者示例 +//// consumerMessages(); +// +// } +// +// //生产者 +// @PostConstruct +// private static void produceMessage() { +// Properties props = new Properties(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); +// +// KafkaProducer producer = new KafkaProducer<>(props); +// //创建生产者 +// try { +// +// //发送消息 +// for (int i = 0; i < 10000; i++) { +// String message = "佳佳来喽" + (i + 1); +// producer.send(new ProducerRecord<>(TOPIC_NAME, message)); +// +// System.out.println("发送消息:" + message); +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } finally { +// producer.close(); +// } +// } +// +// //消费者 +//// private static void consumerMessages() { +//// Properties props = new Properties(); +//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); +//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); +//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); +//// +//// //创建消费者 +//// KafkaConsumer consumer = new KafkaConsumer<>(props); +//// +//// try { +//// +//// //订阅主题 +//// consumer.subscribe(Collections.singletonList(TOPIC_NAME)); +//// +//// //持续消费消息 +//// while (true) { +//// ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); +//// records.forEach(record -> { +//// System.out.println("消费者接受到的消息值:" + record.value()); +//// }); +//// } +//// } catch (Exception e) { +//// e.printStackTrace(); +//// } finally { +//// consumer.close(); +//// } +//// } +// +//}