feat():分区监听器创建成功

master
Saisai Liu 2024-06-18 19:35:17 +08:00
parent 71539e9660
commit a9793136da
2 changed files with 55 additions and 44 deletions

View File

@ -12,8 +12,6 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -21,14 +19,11 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.lang.annotation.Annotation;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.rmi.ServerException; import java.rmi.ServerException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Properties;
/** /**
* @description: kafka * @description: kafka
@ -40,7 +35,7 @@ import java.util.Properties;
*/ */
@Component @Component
@Slf4j @Slf4j
public class kafkaConsumerListenerExample { public class KafkaConsumerListenerExample {
@Autowired @Autowired
@ -69,9 +64,9 @@ public class kafkaConsumerListenerExample {
try { try {
iotDbServer.insertData(vehicle); iotDbServer.insertData(vehicle);
log.info("添加成功"); log.info("添加成功");
if (redisService.hasKey(vehicle.getVin())) { // if (redisService.hasKey(vehicle.getVin())) {
redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle)); // redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
} // }
} catch (StatementExecutionException e) { } catch (StatementExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (ServerException e) { } catch (ServerException e) {

View File

@ -1,14 +1,19 @@
package com.mobai.kafka.listener; package com.mobai.kafka.listener;
import com.alibaba.fastjson2.JSON;
import com.mobai.domain.MqttServerModel; import com.mobai.domain.MqttServerModel;
import com.mobai.domain.Vehicle; import com.mobai.domain.Vehicle;
import com.mobai.forest.ForestGet; import com.mobai.forest.ForestGet;
import com.mobai.utils.RedisService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -25,11 +30,21 @@ import java.util.Properties;
* @date 2024/6/18 17:18 * @date 2024/6/18 17:18
*/ */
@Log4j2
@Component @Component
public class VinConsumer { public class VinConsumer implements ApplicationRunner {
public static void main(String[] args) { @Autowired
private ForestGet forestGet;
@Autowired
private KafkaConsumerListenerExample kafkaConsumerListenerExample;
@Autowired
private RedisService redisService;
@Override
public void run(ApplicationArguments args) throws Exception {
ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
List<String> topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList(); List<String> topics = forestGet.getIps().getData().stream().map(MqttServerModel::getTopic).toList();
for (String topic : topics) { for (String topic : topics) {
@ -39,19 +54,19 @@ public class VinConsumer {
} }
} }
new ConsumerRebalanceListener(){ // new ConsumerRebalanceListener(){
@Override // @Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 可以在这里处理分区被撤销前的逻辑 // // 可以在这里处理分区被撤销前的逻辑
System.out.println("Partitions revoked: " + partitions); // System.out.println("Partitions revoked: " + partitions);
} // }
//
@Override // @Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 可以在这里处理分区被分配后的逻辑 // // 可以在这里处理分区被分配后的逻辑
System.out.println("Partitions assigned: " + partitions); // System.out.println("Partitions assigned: " + partitions);
} // }
}; // };
// 1.参数配置:不是每一非得配置 // 1.参数配置:不是每一非得配置
Properties props = new Properties(); Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); props.put("bootstrap.servers", "localhost:9092");
@ -65,28 +80,29 @@ public class VinConsumer {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(topicPartitions); consumer.assign(topicPartitions);
try { try {
// while (true) { while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 假设等待100毫秒获取消息
if (!records.isEmpty()) { // 检查是否有消息到来 if (!records.isEmpty()) { // 检查是否有消息到来
// 创建线程异步执行 // 创建线程异步执行
new Thread(() -> { new Thread(() -> {
for (TopicPartition partition : records.partitions()) { for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) { for (ConsumerRecord<String, String> record : partitionRecords) {
// 处理每条消息 // 处理每条消息
log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value()); log.info("Offset = {}, Key = {}, Value = {}", record.offset(), record.key(), record.value());
Vehicle vehicle = getVehicle(record.value()); Vehicle vehicle = kafkaConsumerListenerExample.getVehicle(record.value());
if (redisService.hasKey(vehicle.getVin())){ if (redisService.hasKey(vehicle.getVin())){
redisService.setList(vehicle.getVin(),JSON.toJSONString(vehicle)); redisService.setList(vehicle.getVin(), JSON.toJSONString(vehicle));
} log.info("添加实时数据成功");
} }
} }
}).start(); }
} else { }).start();
// 当没有消息时选择休眠一小段时间避免过度占用CPU或者执行其他逻辑 } else {
Thread.sleep(10); // 当没有消息时选择休眠一小段时间避免过度占用CPU或者执行其他逻辑
} Thread.sleep(10);
// } }
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
// 处理解除阻塞时的中断异常如Thread.sleep被中断 // 处理解除阻塞时的中断异常如Thread.sleep被中断
Thread.currentThread().interrupt(); // 重新设置中断标志 Thread.currentThread().interrupt(); // 重新设置中断标志