Kafka测试

server_five_liuyunhu
dongxiaodong 2024-04-02 14:22:55 +08:00
parent 143176fd5a
commit 5b499eb8c3
7 changed files with 95 additions and 61 deletions

View File

@ -95,18 +95,23 @@ public class MqttListen {
log.info("接收消息Qos" + message.getQos());
log.info("接收消息内容:" + new String(message.getPayload()));
//配置Kafka信息
// 用来配置kafka 消息生产者对象的配置信息
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.181.16:9092");
//配置host
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "39.103.133.136:9092");
//配置键值的序列方式
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//创建消生产者对象
//创建消生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//通过Kafka将接收到的消息内容发送给解析系统
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("couplet", new String(message.getPayload()));
//发送消息
//创建消息记录
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("couplet-kafka",topic,new String(message.getPayload()));
kafkaProducer.send(producerRecord);
//关闭 Kafka 生产者
System.out.println("发送消息成功:" +producerRecord);
//关闭 kafkaProducer
kafkaProducer.close();
}

View File

@ -0,0 +1,19 @@
package com.couplet.msg;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/2 8:39
* @description
*/
@SpringBootApplication
@EnableFeignClients(basePackages = "com.couplet.**")
public class CoupletMsgApplication {
public static void main(String[] args) {
SpringApplication.run(CoupletMsgApplication.class);
}
}

View File

@ -20,12 +20,12 @@ public class KafkaConsumerQuickStart {
public static void main(String[] args) {
//创建 properties 对象配置 kafka消费者的配置信息
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"39.103.133.136:9092");
//设置键值的反序列化方式
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//分组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "couplet-kafka-group");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
//创建Kafka消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
@ -35,12 +35,17 @@ public class KafkaConsumerQuickStart {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(2000));
if (!records.isEmpty()) {
// 遍历
records.forEach(record -> {
String key = record.key();
String value = record.value();
System.out.println("消息接收key成功" + key + "消息接收value成功" + value);
});
} else {
System.out.println("未拉取到消息,等待中...");
}
break;
}
}

View File

@ -0,0 +1,13 @@
package com.couplet.msg.contents;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/2 14:05
* @description
*/
public class KafkaContents {
public static final String TOPIC = "couplet-top";
public static final String KAFKA_CON = "39.103.133.136:39092,39.103.133.136:29092,39.103.133.136:19092";
}

View File

@ -1,6 +1,7 @@
package com.couplet.msg.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
@ -14,15 +15,11 @@ import java.util.Properties;
* @description
*/
public class KafkaProducerQuickStart {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("couplet-kafka", "key", "helloWord");
kafkaProducer.send(producerRecord);
kafkaProducer.close();
private static Producer<String,String> producer;
public static void KafkaProducer() {
Properties props = new Properties();
}
}

View File

@ -0,0 +1,32 @@
# Tomcat
server:
port: 9617
# Spring
spring:
application:
# 应用名称
name: couplet-msg
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: a439ce3f-2c42-4b4c-9c4d-c8db49933c15
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
main:
allow-bean-definition-overriding: true
logging:
level:
com.couplet.system.mapper: DEBUG

View File

@ -1,37 +0,0 @@
package com.couplet.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/1 11:12
* @description
*/
public class KafkaProducerQuickStart {
public static void main(String[] args) {
// 发送消息Kafka
// 用来配置kafka 消息生产者对象的配置信息
Properties properties = new Properties();
//配置host
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "39.103.129.53:9092");
//配置键值的序列方式
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//创建消息生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//发送消息
//创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>("couplet-kafka", "key", "helloWord");
kafkaProducer.send(record);
//关闭 kafkaProducer
kafkaProducer.close();
}
}