diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml index ebf05ed..be2c65b 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml @@ -81,32 +81,41 @@ couplet-common-swagger - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.2.5 - + + + + + + com.couplet couplet-common-event + org.springframework.kafka spring-kafka + --> + + + + org.apache.kafka + kafka-clients + 2.8.0 + com.couplet couplet-common-business - - - org.springframework.boot - spring-boot-starter-amqp - + + + + + diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java index 1000f15..7608a0d 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java @@ -1,5 +1,7 @@ package com.couplet.analyze.msg; +import com.couplet.analyze.msg.model.ModelsKafkaMessage; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; @@ -15,8 +17,13 @@ import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling @EnableFeignClients(basePackages = "com.couplet") public class CoupletMsgApplication { + + public static void main(String[] args) { SpringApplication.run(CoupletMsgApplication.class); System.out.println("解析系统启动成功"); +// new ModelsKafkaMessage().initKafkaConsumer(); } + + } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 9536426..4c82aa5 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -1,5 +1,6 @@ package com.couplet.analyze.msg.model; + import com.couplet.analyze.common.event.AnalyzeEventCache; import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.service.IncidentService; @@ -51,6 +52,9 @@ public class ModelsKafkaMessage { private AnalyzeEventCache analyzeEventCache; + + + //kafka消费者初始化 @PostConstruct public void initKafkaConsumer() { @@ -79,6 +83,7 @@ public class ModelsKafkaMessage { public void consumerMessages() { executorService.execute(this::consumer); } + public void consumer() { try { diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index b2e9ca1..9b0c2eb 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -23,7 +23,6 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - main: allow-bean-definition-overriding: true rabbitmq: @@ -43,15 +42,9 @@ spring: template: # 只要消息抵达Queue,就会异步发送优先回调return firm mandatory: true - kafka: - bootstrap-servers: 115.159.47.13:9092 - consumer: - group-id: group - auto-offset-reset: earliest logging: level: com.couplet.analyze.msg.mapper: DEBUG mybatis-plus: configuration: map-underscore-to-camel-case: true - diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index 6f4bfc9..68ac0dd 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "dong"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑 diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index ddf4716..fc1f9a1 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -28,6 +28,7 @@ spring: logging: level: com.couplet.system.mapper: DEBUG + # 订阅端配置 mqtt: server: