二次更新

master
gukaixuan 2023-09-16 14:52:39 +08:00
parent 9fe0c50af2
commit 4afd656bc5
4 changed files with 60 additions and 11 deletions

View File

@ -1,16 +1,16 @@
package com.ruoyi.receive.kafka.config;
import com.sun.corba.se.internal.CosNaming.BootstrapServer;
import lombok.AllArgsConstructor;
import net.sf.jsqlparser.statement.select.Top;
import com.ruoyi.receive.netty.server.KafkaConsumerService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
@ -25,14 +25,15 @@ public class Consumer {
private final ConsumerCustomConfig consumerCustomConfig;
public Consumer(KafkaConsumerConfig kafkaConsumerConfig, ConsumerCustomConfig consumerCustomConfig) {
private final KafkaConsumerService kafkaConsumerService;
public Consumer(KafkaConsumerConfig kafkaConsumerConfig, ConsumerCustomConfig consumerCustomConfig, KafkaConsumerService kafkaConsumerService) {
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.consumerCustomConfig = consumerCustomConfig;
this.kafkaConsumerService = kafkaConsumerService;
}
@Bean
public KafkaConsumer<String,String> consumerInit(){
//创建kafka消费者配置
@ -43,8 +44,21 @@ public class Consumer {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//创建kafka消费者
KafkaConsumer<String,String> consumer= new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList(consumerCustomConfig.getTopic()));
//分配特定的分区
TopicPartition topicPartition = new TopicPartition(consumerCustomConfig.getTopic(), consumerCustomConfig.getPartitions());
consumer.assign(Collections.singletonList(topicPartition));
new Thread(()->{
//消费信息
while (true){
ConsumerRecords<String,String> records =
consumer.poll(Duration.ofMillis(consumerCustomConfig.getOfMillis()));
kafkaConsumerService.service(records);
}
}).start();
return consumer;
}

View File

@ -18,8 +18,12 @@ public class ConsumerCustomConfig {
private String topic;
private String partitions;
private Integer partitions;
/**
*
*/
private long ofMillis = 1000;
}

View File

@ -0,0 +1,12 @@
package com.ruoyi.receive.netty.server;
import org.apache.kafka.clients.consumer.ConsumerRecords;
/**
* kafka
*/
public interface KafkaConsumerService {
public void service(ConsumerRecords<String,String> records);
}

View File

@ -0,0 +1,19 @@
package com.ruoyi.receive.netty.server.impl;
import com.ruoyi.receive.netty.server.KafkaConsumerService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
@Override
public void service(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String,String> record : records){
System.out.println("消息key = " + record.key() + ",value = " + record.value());
}
}
}