diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java index 6ddb9ee..84c2a8a 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java @@ -95,18 +95,23 @@ public class MqttListen { log.info("接收消息Qos:" + message.getQos()); log.info("接收消息内容:" + new String(message.getPayload())); - //配置Kafka信息 + // 用来配置kafka 消息生产者对象的配置信息 Properties properties = new Properties(); - properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.181.16:9092"); + //配置host + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "39.103.133.136:9092"); + //配置键值的序列方式 properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); - //创建消费生产者对象 + //创建消息生产者对象 KafkaProducer kafkaProducer = new KafkaProducer<>(properties); - //通过Kafka将接收到的消息内容发送给解析系统 - ProducerRecord producerRecord = new ProducerRecord<>("couplet", new String(message.getPayload())); + + //发送消息 + //创建消息记录 + ProducerRecord producerRecord = new ProducerRecord<>("couplet-kafka",topic,new String(message.getPayload())); kafkaProducer.send(producerRecord); - //关闭 Kafka 生产者 + System.out.println("发送消息成功:" +producerRecord); + //关闭 kafkaProducer kafkaProducer.close(); } diff --git a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/CoupletMsgApplication.java b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/CoupletMsgApplication.java new file mode 100644 index 0000000..9814995 --- /dev/null +++ b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/CoupletMsgApplication.java @@ -0,0 +1,19 @@ +package com.couplet.msg; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.openfeign.EnableFeignClients; + +/** + * @author DongXiaoDong + * @version 1.0 + * @date 2024/4/2 8:39 + * @description + */ +@SpringBootApplication +@EnableFeignClients(basePackages = "com.couplet.**") +public class CoupletMsgApplication { + public static void main(String[] args) { + SpringApplication.run(CoupletMsgApplication.class); + } +} diff --git a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java index 68e6705..93575ed 100644 --- a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java +++ b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java @@ -20,27 +20,32 @@ public class KafkaConsumerQuickStart { public static void main(String[] args) { //创建 properties 对象配置 kafka消费者的配置信息 Properties properties = new Properties(); - properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092"); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"39.103.133.136:9092"); //设置键值的反序列化方式 properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //分组 - properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "couplet-kafka-group"); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group"); //创建Kafka消费者对象 KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties); //设置订阅主题 kafkaConsumer.subscribe(Collections.singleton("couplet-kafka")); //拉取消息 - while (true){ + while (true) { ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(2000)); - //遍历 - records.forEach(record ->{ - String key = record.key(); - String value = record.value(); - System.out.println("消息接收key成功:" + key + "消息接收value成功:" + value); - }); + if (!records.isEmpty()) { + // 遍历 + records.forEach(record -> { + String key = record.key(); + String value = record.value(); + System.out.println("消息接收key成功:" + key + "消息接收value成功:" + value); + }); + } else { + System.out.println("未拉取到消息,等待中..."); + } + break; } } diff --git a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/contents/KafkaContents.java b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/contents/KafkaContents.java new file mode 100644 index 0000000..41eb6e2 --- /dev/null +++ b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/contents/KafkaContents.java @@ -0,0 +1,13 @@ +package com.couplet.msg.contents; + +/** + * @author DongXiaoDong + * @version 1.0 + * @date 2024/4/2 14:05 + * @description + */ +public class KafkaContents { + public static final String TOPIC = "couplet-top"; + + public static final String KAFKA_CON = "39.103.133.136:39092,39.103.133.136:29092,39.103.133.136:19092"; +} diff --git a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java index f7653f6..ad5e284 100644 --- a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java +++ b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java @@ -1,6 +1,7 @@ package com.couplet.msg.producer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; @@ -14,15 +15,11 @@ import java.util.Properties; * @description */ public class KafkaProducerQuickStart { - public static void main(String[] args) { - Properties properties = new Properties(); - properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092"); - properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); - KafkaProducer kafkaProducer = new KafkaProducer<>(properties); - ProducerRecord producerRecord = new ProducerRecord<>("couplet-kafka", "key", "helloWord"); - kafkaProducer.send(producerRecord); - kafkaProducer.close(); + + private static Producer producer; + + public static void KafkaProducer() { + Properties props = new Properties(); } } diff --git a/couplet-modules/couplet-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-msg/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..ab9685a --- /dev/null +++ b/couplet-modules/couplet-msg/src/main/resources/bootstrap.yml @@ -0,0 +1,32 @@ +# Tomcat +server: + port: 9617 + +# Spring +spring: + application: + # 应用名称 + name: couplet-msg + profiles: + # 环境配置 + active: dev + cloud: + nacos: + discovery: + # 服务注册地址 + server-addr: 121.89.211.230:8848 + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 + config: + # 配置中心地址 + server-addr: 121.89.211.230:8848 + namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15 + # 配置文件格式 + file-extension: yml + # 共享配置 + shared-configs: + - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + main: + allow-bean-definition-overriding: true +logging: + level: + com.couplet.system.mapper: DEBUG diff --git a/couplet-modules/couplet-msg/src/test/java/com/couplet/producer/KafkaProducerQuickStart.java b/couplet-modules/couplet-msg/src/test/java/com/couplet/producer/KafkaProducerQuickStart.java deleted file mode 100644 index f9b9d4f..0000000 --- a/couplet-modules/couplet-msg/src/test/java/com/couplet/producer/KafkaProducerQuickStart.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.couplet.producer; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; - -import java.util.Properties; - -/** - * @author DongXiaoDong - * @version 1.0 - * @date 2024/4/1 11:12 - * @description - */ -public class KafkaProducerQuickStart { - public static void main(String[] args) { - // 发送消息Kafka - // 用来配置kafka 消息生产者对象的配置信息 - Properties properties = new Properties(); - //配置host - properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "39.103.129.53:9092"); - //配置键值的序列方式 - properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); - //创建消息生产者对象 - KafkaProducer kafkaProducer = new KafkaProducer<>(properties); - - //发送消息 - //创建消息记录 - ProducerRecord record = new ProducerRecord<>("couplet-kafka", "key", "helloWord"); - kafkaProducer.send(record); - - //关闭 kafkaProducer - kafkaProducer.close(); - } -}