From 143176fd5a030f5c07d6121b531b808ccf679367 Mon Sep 17 00:00:00 2001 From: dongxiaodong <13970843+dxdwork@user.noreply.gitee.com> Date: Mon, 1 Apr 2024 20:48:43 +0800 Subject: [PATCH] =?UTF-8?q?Kafka=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- couplet-modules/couplet-modules-mq/pom.xml | 6 +++ .../com/couplet/mq/service/MqttListen.java | 18 +++++++ .../src/main/resources/bootstrap.yml | 3 +- couplet-modules/couplet-msg/pom.xml | 5 ++ .../msg/consumer/KafkaConsumerQuickStart.java | 47 +++++++++++++++++++ .../msg/producer/KafkaProducerQuickStart.java | 28 +++++++++++ .../producer/KafkaProducerQuickStart.java | 37 +++++++++++++++ .../trouble/domain/CoupletTroubleCode.java | 6 +++ 8 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java create mode 100644 couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java create mode 100644 couplet-modules/couplet-msg/src/test/java/com/couplet/producer/KafkaProducerQuickStart.java diff --git a/couplet-modules/couplet-modules-mq/pom.xml b/couplet-modules/couplet-modules-mq/pom.xml index 7b1e760..dded87b 100644 --- a/couplet-modules/couplet-modules-mq/pom.xml +++ b/couplet-modules/couplet-modules-mq/pom.xml @@ -96,6 +96,12 @@ org.springframework.boot spring-boot-starter-amqp + + + + org.springframework.kafka + spring-kafka + diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java index 8b914e9..6ddb9ee 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/MqttListen.java @@ -1,12 +1,17 @@ package com.couplet.mq.service; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.util.Properties; /** * @ProjectName: five-groups-couplet @@ -90,6 +95,19 @@ public class MqttListen { log.info("接收消息Qos:" + message.getQos()); log.info("接收消息内容:" + new String(message.getPayload())); + //配置Kafka信息 + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.181.16:9092"); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); + //创建消费生产者对象 + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + //通过Kafka将接收到的消息内容发送给解析系统 + ProducerRecord producerRecord = new ProducerRecord<>("couplet", new String(message.getPayload())); + kafkaProducer.send(producerRecord); + + //关闭 Kafka 生产者 + kafkaProducer.close(); } @Override diff --git a/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml index 29958e8..5149608 100644 --- a/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-mq/src/main/resources/bootstrap.yml @@ -34,7 +34,8 @@ logging: # 订阅端配置 mqtt: server: - broker: tcp://115.159.47.13:1883 + broker: tcp://8.130.181.16:1883 +# broker: tcp://115.159.47.13:1883 username: password: clientid: mqttx diff --git a/couplet-modules/couplet-msg/pom.xml b/couplet-modules/couplet-msg/pom.xml index d826790..6b6ccc3 100644 --- a/couplet-modules/couplet-msg/pom.xml +++ b/couplet-modules/couplet-msg/pom.xml @@ -80,5 +80,10 @@ couplet-common-swagger + + org.springframework.kafka + spring-kafka + + diff --git a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java new file mode 100644 index 0000000..68e6705 --- /dev/null +++ b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/consumer/KafkaConsumerQuickStart.java @@ -0,0 +1,47 @@ +package com.couplet.msg.consumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +/** + * @author DongXiaoDong + * @version 1.0 + * @date 2024/4/1 19:04 + * @description + */ +public class KafkaConsumerQuickStart { + public static void main(String[] args) { + //创建 properties 对象配置 kafka消费者的配置信息 + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092"); + //设置键值的反序列化方式 + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + //分组 + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "couplet-kafka-group"); + //创建Kafka消费者对象 + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties); + + //设置订阅主题 + kafkaConsumer.subscribe(Collections.singleton("couplet-kafka")); + //拉取消息 + while (true){ + ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(2000)); + + //遍历 + records.forEach(record ->{ + String key = record.key(); + String value = record.value(); + System.out.println("消息接收key成功:" + key + "消息接收value成功:" + value); + }); + } + } + +} diff --git a/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java new file mode 100644 index 0000000..f7653f6 --- /dev/null +++ b/couplet-modules/couplet-msg/src/main/java/com/couplet/msg/producer/KafkaProducerQuickStart.java @@ -0,0 +1,28 @@ +package com.couplet.msg.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +/** + * @author DongXiaoDong + * @version 1.0 + * @date 2024/4/1 20:07 + * @description + */ +public class KafkaProducerQuickStart { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092"); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + ProducerRecord producerRecord = new ProducerRecord<>("couplet-kafka", "key", "helloWord"); + kafkaProducer.send(producerRecord); + kafkaProducer.close(); + } + +} diff --git a/couplet-modules/couplet-msg/src/test/java/com/couplet/producer/KafkaProducerQuickStart.java b/couplet-modules/couplet-msg/src/test/java/com/couplet/producer/KafkaProducerQuickStart.java new file mode 100644 index 0000000..f9b9d4f --- /dev/null +++ b/couplet-modules/couplet-msg/src/test/java/com/couplet/producer/KafkaProducerQuickStart.java @@ -0,0 +1,37 @@ +package com.couplet.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +/** + * @author DongXiaoDong + * @version 1.0 + * @date 2024/4/1 11:12 + * @description + */ +public class KafkaProducerQuickStart { + public static void main(String[] args) { + // 发送消息Kafka + // 用来配置kafka 消息生产者对象的配置信息 + Properties properties = new Properties(); + //配置host + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "39.103.129.53:9092"); + //配置键值的序列方式 + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); + //创建消息生产者对象 + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + + //发送消息 + //创建消息记录 + ProducerRecord record = new ProducerRecord<>("couplet-kafka", "key", "helloWord"); + kafkaProducer.send(record); + + //关闭 kafkaProducer + kafkaProducer.close(); + } +} diff --git a/couplet-modules/couplet-trouble/src/main/java/com/couplet/trouble/domain/CoupletTroubleCode.java b/couplet-modules/couplet-trouble/src/main/java/com/couplet/trouble/domain/CoupletTroubleCode.java index 22e5fc5..a4c166c 100644 --- a/couplet-modules/couplet-trouble/src/main/java/com/couplet/trouble/domain/CoupletTroubleCode.java +++ b/couplet-modules/couplet-trouble/src/main/java/com/couplet/trouble/domain/CoupletTroubleCode.java @@ -6,6 +6,8 @@ import com.couplet.common.core.annotation.Excel; import lombok.*; import lombok.experimental.SuperBuilder; +import javax.validation.constraints.NotEmpty; + /** * @author DongXiaoDong * @version 1.0 @@ -23,12 +25,14 @@ public class CoupletTroubleCode { */ @TableId(value = "trouble_id",type = IdType.AUTO) @Excel(name = "故障码主键", cellType = Excel.ColumnType.NUMERIC) + @NotEmpty(message = "故障码主键不能为空") private Integer troubleId; /** * 故障码 */ @Excel(name = "故障码") + @NotEmpty(message = "故障码不能为空") private String troubleCode; /** @@ -53,11 +57,13 @@ public class CoupletTroubleCode { * 故障类型Id */ @Excel(name = "故障类型Id") + @NotEmpty(message = "故障类型Id不能为空") private Integer typeId; /** * 故障等级Id */ @Excel(name = "故障等级Id") + @NotEmpty(message = "故障等级Id不能为空") private Integer gradeId; }