diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml index 00c4073..4c842e8 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml @@ -81,24 +81,12 @@ couplet-common-swagger - - - - - - com.couplet couplet-common-event - - - org.apache.kafka @@ -111,12 +99,6 @@ couplet-common-business - - - - - - diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 93be367..89b30ee 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -49,9 +49,6 @@ public class ModelsKafkaMessage { private AnalyzeEventCache analyzeEventCache; - - - //kafka消费者初始化 @PostConstruct public void initKafkaConsumer() { @@ -64,8 +61,8 @@ public class ModelsKafkaMessage { //消费者 consumer = new KafkaConsumer<>(props); - //订阅主题 - consumer.subscribe(Collections.singletonList(TOPIC_NAME)); + + this.consumerMessages(); } @@ -76,13 +73,15 @@ public class ModelsKafkaMessage { * @return */ // @Scheduled(fixedDelay = 50) - @PostConstruct +// @PostConstruct public void consumerMessages() { executorService.execute(this::consumer); } public void consumer() { try { + //订阅主题 + consumer.subscribe(Collections.singletonList(TOPIC_NAME)); //持续消费消息 while (true) {