From 70f3d0ecc9961ccc7e96f987b40eec1ce523167f Mon Sep 17 00:00:00 2001
From: dongxiaodong <13970843+dxdwork@user.noreply.gitee.com>
Date: Tue, 9 Apr 2024 18:44:54 +0800
Subject: [PATCH] =?UTF-8?q?kafka=E4=BF=AE=E6=94=B9bug?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../couplet-analyze-msg/pom.xml | 18 ------------------
.../analyze/msg/model/ModelsKafkaMessage.java | 11 +++++------
2 files changed, 5 insertions(+), 24 deletions(-)
diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml
index 00c4073..4c842e8 100644
--- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml
+++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml
@@ -81,24 +81,12 @@
couplet-common-swagger
-
-
-
-
-
-
com.couplet
couplet-common-event
-
-
-
org.apache.kafka
@@ -111,12 +99,6 @@
couplet-common-business
-
-
-
-
-
-
diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java
index 93be367..89b30ee 100644
--- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java
+++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java
@@ -49,9 +49,6 @@ public class ModelsKafkaMessage {
private AnalyzeEventCache analyzeEventCache;
-
-
-
//kafka消费者初始化
@PostConstruct
public void initKafkaConsumer() {
@@ -64,8 +61,8 @@ public class ModelsKafkaMessage {
//消费者
consumer = new KafkaConsumer<>(props);
- //订阅主题
- consumer.subscribe(Collections.singletonList(TOPIC_NAME));
+
+ this.consumerMessages();
}
@@ -76,13 +73,15 @@ public class ModelsKafkaMessage {
* @return
*/
// @Scheduled(fixedDelay = 50)
- @PostConstruct
+// @PostConstruct
public void consumerMessages() {
executorService.execute(this::consumer);
}
public void consumer() {
try {
+ //订阅主题
+ consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//持续消费消息
while (true) {