kafka修改bug

server_five_dongxiaodong
dongxiaodong 2024-04-09 18:44:54 +08:00
parent 9bb5cf3435
commit 70f3d0ecc9
2 changed files with 5 additions and 24 deletions

View File

@ -81,24 +81,12 @@
<artifactId>couplet-common-swagger</artifactId>
</dependency>
<!-- &lt;!&ndash; mqttx依赖 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.eclipse.paho</groupId>-->
<!-- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>-->
<!-- <version>1.2.5</version>-->
<!-- </dependency>-->
<!-- 事件核心配置 -->
<dependency>
<groupId>com.couplet</groupId>
<artifactId>couplet-common-event</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>-->
<!-- Kafka依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
@ -111,12 +99,6 @@
<artifactId>couplet-common-business</artifactId>
</dependency>
<!-- &lt;!&ndash; RabbitMQ依赖&ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-amqp</artifactId>-->
<!-- </dependency>-->
</dependencies>
</project>

View File

@ -49,9 +49,6 @@ public class ModelsKafkaMessage {
private AnalyzeEventCache analyzeEventCache;
//kafka消费者初始化
@PostConstruct
public void initKafkaConsumer() {
@ -64,8 +61,8 @@ public class ModelsKafkaMessage {
//消费者
consumer = new KafkaConsumer<>(props);
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
this.consumerMessages();
}
@ -76,13 +73,15 @@ public class ModelsKafkaMessage {
* @return
*/
// @Scheduled(fixedDelay = 50)
@PostConstruct
// @PostConstruct
public void consumerMessages() {
executorService.execute(this::consumer);
}
public void consumer() {
try {
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
//持续消费消息
while (true) {