From a9793136da61cb7de534f012b9a42770c5d02a45 Mon Sep 17 00:00:00 2001 From: Saisai Liu <1374434128@qq.com> Date: Tue, 18 Jun 2024 19:35:17 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88=EF=BC=89=EF=BC=9A=E5=88=86?= =?UTF-8?q?=E5=8C=BA=E7=9B=91=E5=90=AC=E5=99=A8=E5=88=9B=E5=BB=BA=E6=88=90?= =?UTF-8?q?=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...java => KafkaConsumerListenerExample.java} | 13 +-- .../com/mobai/kafka/listener/VinConsumer.java | 86 +++++++++++-------- 2 files changed, 55 insertions(+), 44 deletions(-) rename mobai-event-service/src/main/java/com/mobai/kafka/listener/{kafkaConsumerListenerExample.java => KafkaConsumerListenerExample.java} (96%) diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java similarity index 96% rename from mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java rename to mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java index 83d150a..661384e 100644 --- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java +++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/KafkaConsumerListenerExample.java @@ -12,8 +12,6 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -21,14 +19,11 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import java.lang.annotation.Annotation; import java.math.BigDecimal; import java.rmi.ServerException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Properties; /** * @description: kafka 消费者 @@ -40,7 +35,7 @@ import java.util.Properties; */ @Component @Slf4j -public class kafkaConsumerListenerExample { +public class KafkaConsumerListenerExample { @Autowired @@ -69,9 +64,9 @@ public class kafkaConsumerListenerExample { try { iotDbServer.insertData(vehicle); log.info("添加成功"); - if (redisService.hasKey(vehicle.getVin())) { - redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle)); - } +// if (redisService.hasKey(vehicle.getVin())) { +// redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle)); +// } } catch (StatementExecutionException e) { throw new RuntimeException(e); } catch (ServerException e) { diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java index 4442701..3b11c93 100644 --- a/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java +++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/VinConsumer.java @@ -1,14 +1,19 @@ package com.mobai.kafka.listener; +import com.alibaba.fastjson2.JSON; import com.mobai.domain.MqttServerModel; import com.mobai.domain.Vehicle; import com.mobai.forest.ForestGet; +import com.mobai.utils.RedisService; +import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.stereotype.Component; @@ -25,11 +30,21 @@ import java.util.Properties; * @date 2024/6/18 17:18 */ +@Log4j2 @Component -public class VinConsumer { +public class VinConsumer implements ApplicationRunner { - public static void main(String[] args) { + @Autowired + private ForestGet forestGet; + @Autowired + private KafkaConsumerListenerExample kafkaConsumerListenerExample; + + @Autowired + private RedisService redisService; + + @Override + public void run(ApplicationArguments args) throws Exception { ArrayList topicPartitions = new ArrayList<>(); List topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList(); for (String topic : topics) { @@ -39,19 +54,19 @@ public class VinConsumer { } } - new ConsumerRebalanceListener(){ - @Override - public void onPartitionsRevoked(Collection partitions) { - // 可以在这里处理分区被撤销前的逻辑 - System.out.println("Partitions revoked: " + partitions); - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - // 可以在这里处理分区被分配后的逻辑 - System.out.println("Partitions assigned: " + partitions); - } - }; +// new ConsumerRebalanceListener(){ +// @Override +// public void onPartitionsRevoked(Collection partitions) { +// // 可以在这里处理分区被撤销前的逻辑 +// System.out.println("Partitions revoked: " + partitions); +// } +// +// @Override +// public void onPartitionsAssigned(Collection partitions) { +// // 可以在这里处理分区被分配后的逻辑 +// System.out.println("Partitions assigned: " + partitions); +// } +// }; // 1.参数配置:不是每一非得配置 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); @@ -65,28 +80,29 @@ public class VinConsumer { KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.assign(topicPartitions); try { -// while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息 - if (!records.isEmpty()) { // 检查是否有消息到来 - // 创建线程异步执行 - new Thread(() -> { - for (TopicPartition partition : records.partitions()) { - List> partitionRecords = records.records(partition); - for (ConsumerRecord record : partitionRecords) { - // 处理每条消息 - log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value()); - Vehicle vehicle = getVehicle(record.value()); - if (redisService.hasKey(vehicle.getVin())){ - redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle)); - } + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息 + if (!records.isEmpty()) { // 检查是否有消息到来 + // 创建线程异步执行 + new Thread(() -> { + for (TopicPartition partition : records.partitions()) { + List> partitionRecords = records.records(partition); + for (ConsumerRecord record : partitionRecords) { + // 处理每条消息 + log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value()); + Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value()); + if (redisService.hasKey(vehicle.getVin())){ + redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle)); + log.info("添加实时数据成功"); } } - }).start(); - } else { - // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑 - Thread.sleep(10); - } -// } + } + }).start(); + } else { + // 当没有消息时,选择休眠一小段时间避免过度占用CPU,或者执行其他逻辑 + Thread.sleep(10); + } + } } catch (InterruptedException e) { // 处理解除阻塞时的中断异常,如Thread.sleep被中断 Thread.currentThread().interrupt(); // 重新设置中断标志