From 9bb5cf34354c4a33f83e8a281613a8577c40192d Mon Sep 17 00:00:00 2001 From: dongxiaodong <13970843+dxdwork@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 11:33:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD=E8=AF=AD?= =?UTF-8?q?=E5=8F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../couplet-analyze-msg/pom.xml | 32 ++++++++++++------- .../analyze/msg/CoupletMsgApplication.java | 7 ++++ .../analyze/msg/model/ModelsKafkaMessage.java | 7 ++-- .../src/main/resources/bootstrap.yml | 6 ---- .../com/couplet/online/utils/MqttMonitor.java | 2 +- .../src/main/resources/bootstrap.yml | 6 ++-- 6 files changed, 34 insertions(+), 26 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml index fd7ec92..00c4073 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml @@ -81,12 +81,12 @@ couplet-common-swagger - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.2.5 - + + + + + + com.couplet @@ -94,20 +94,28 @@ - + + + + + org.apache.kafka + kafka-clients + 2.8.0 + com.couplet couplet-common-business - - - org.springframework.boot - spring-boot-starter-amqp - + + + + + diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java index 43a0618..0cf32fb 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java @@ -1,5 +1,7 @@ package com.couplet.analyze.msg; +import com.couplet.analyze.msg.model.ModelsKafkaMessage; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; @@ -15,8 +17,13 @@ import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling @EnableFeignClients(basePackages = "com.couplet.**") public class CoupletMsgApplication { + + public static void main(String[] args) { SpringApplication.run(CoupletMsgApplication.class); System.out.println("解析系统启动成功"); +// new ModelsKafkaMessage().initKafkaConsumer(); } + + } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index d9782d4..93be367 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -1,5 +1,6 @@ package com.couplet.analyze.msg.model; + import com.couplet.analyze.common.event.AnalyzeEventCache; import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.service.IncidentService; @@ -10,7 +11,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -26,7 +26,6 @@ import static com.couplet.analyze.msg.utils.MsgUtils.hexToString; import static com.couplet.analyze.msg.utils.MsgUtils.sendMsg; import static java.lang.Thread.sleep; - /** * @author DongXiaoDong * @version 1.0 @@ -36,7 +35,7 @@ import static java.lang.Thread.sleep; @Component @Slf4j public class ModelsKafkaMessage { - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "dong"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; @@ -89,7 +88,7 @@ public class ModelsKafkaMessage { while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { - System.out.println("接收到的数据:" + record.value()); + log.info("接收到的数据:" + record.value()); String str = hexToString(record.value()); List coupletMsgDataList = sendMsg(str); for (CoupletMsgData msgData : coupletMsgDataList) { diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index 1c4ab93..e506271 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -50,9 +50,3 @@ logging: mybatis-plus: configuration: map-underscore-to-camel-case: true - -# RabbitMQ配置 -mq: - queueName: queue - exchangeName: exchange - routingKey: routingKey diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index 6f4bfc9..68ac0dd 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "dong"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑 diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index 150ec78..d7abfa2 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -34,11 +34,11 @@ logging: # 订阅端配置 mqtt: server: - broker: tcp://115.159.47.13:1883 -# broker: mqtt://115.159.47.13:1883 +# broker: tcp://115.159.47.13:1883 + broker: tcp://8.130.181.16:1883 username: password: - clientId: mq + clientId: aaa qos: 0 topic: dxd