更新重构代码

master
KillBinBin 2023-09-15 20:21:41 +08:00
parent e5f10c7780
commit 67579c2b4f
6 changed files with 123 additions and 2 deletions

View File

@ -74,6 +74,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.7</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- kafka-->

View File

@ -10,7 +10,7 @@ import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "kafka.consumer")
public class ConsumerConfig {
public class ConsumerCustomConfig {
/**
*
*/

View File

@ -0,0 +1,88 @@
package com.ruoyi.analysis.kafka.consumer;
import com.ruoyi.analysis.kafka.config.ConsumerCustomConfig;
import com.ruoyi.analysis.kafka.config.KafkaConfig;
import com.ruoyi.analysis.kafka.service.InKafkaConsumer;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.codehaus.jackson.map.deser.std.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
*
*/
@Component
@Log4j2
public class Consumer{
/**
*
*/
private final ConsumerCustomConfig consumerCustomConfig;
/**
* kafka
*/
private final KafkaConfig kafkaConfig;
/**
*
*/
private InKafkaConsumer inKafkaConsumer;
public Consumer(ConsumerCustomConfig consumerCustomConfig, KafkaConfig kafkaConfig, InKafkaConsumer inKafkaConsumer) {
this.consumerCustomConfig = consumerCustomConfig;
this.kafkaConfig = kafkaConfig;
this.inKafkaConsumer = inKafkaConsumer;
}
//@PostConstruct 是一个注解,它被用于标记一个方法,在对象被创建后立即执行该方法
//@PostConstruct
@Bean
public KafkaConsumer<String, String> init(){
//消费者配置
Properties properties = new Properties();
//服务地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaConfig.getHosts());
//消费者
properties.put(ConsumerConfig.GROUP_ID_CONFIG,consumerCustomConfig.getGroup());
//反序列化key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//反序列化value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//创建kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singleton(consumerCustomConfig.getTopic()));
//分配分区
TopicPartition topicPartition = new TopicPartition(consumerCustomConfig.topic, consumerCustomConfig.getPartitions());
consumer.assign(Collections.singletonList(topicPartition));
//把循环放在另外一个线程执行 否则不会返回数据
new Thread(()->{
//消费信息
while (true){
//拉取主题中的消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(consumerCustomConfig.getWaitingTime()));
//接口传入参数
inKafkaConsumer.kafkaConsumerInterface(records);
for (ConsumerRecord<String, String> record : records) {
log.info("接收到的消息 key:"+record.key()+"value:"+record.value());
}
}
}).start();
return consumer;
}
}

View File

@ -0,0 +1,11 @@
package com.ruoyi.analysis.kafka.service;
import org.apache.kafka.clients.consumer.ConsumerRecords;
/**
* kafka
*/
public interface InKafkaConsumer {
public void kafkaConsumerInterface(ConsumerRecords<String, String> records);
}

View File

@ -0,0 +1,16 @@
package com.ruoyi.analysis.kafka.service.impl;
import com.ruoyi.analysis.kafka.service.InKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.stereotype.Service;
/**
*
*/
@Service
public class InKafkaConsumerimpl implements InKafkaConsumer {
@Override
public void kafkaConsumerInterface(ConsumerRecords<String, String> records) {
}
}

View File

@ -1,6 +1,6 @@
# Tomcat
server:
port: 9456
port: 9466
netty:
port: 8081
boss-group-thread-number: 5