Kafka测试

server_five_liuyunhu
dongxiaodong 2024-04-01 20:48:43 +08:00
parent a20dc82049
commit 143176fd5a
8 changed files with 149 additions and 1 deletions

View File

@ -96,6 +96,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -1,12 +1,17 @@
package com.couplet.mq.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
/**
* @ProjectName: five-groups-couplet
@ -90,6 +95,19 @@ public class MqttListen {
log.info("接收消息Qos" + message.getQos());
log.info("接收消息内容:" + new String(message.getPayload()));
//配置Kafka信息
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.181.16:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//创建消费生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//通过Kafka将接收到的消息内容发送给解析系统
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("couplet", new String(message.getPayload()));
kafkaProducer.send(producerRecord);
//关闭 Kafka 生产者
kafkaProducer.close();
}
@Override

View File

@ -34,7 +34,8 @@ logging:
# 订阅端配置
mqtt:
server:
broker: tcp://115.159.47.13:1883
broker: tcp://8.130.181.16:1883
# broker: tcp://115.159.47.13:1883
username:
password:
clientid: mqttx

View File

@ -80,5 +80,10 @@
<artifactId>couplet-common-swagger</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,47 @@
package com.couplet.msg.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/1 19:04
* @description
*/
public class KafkaConsumerQuickStart {
public static void main(String[] args) {
//创建 properties 对象配置 kafka消费者的配置信息
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092");
//设置键值的反序列化方式
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//分组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "couplet-kafka-group");
//创建Kafka消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//设置订阅主题
kafkaConsumer.subscribe(Collections.singleton("couplet-kafka"));
//拉取消息
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(2000));
//遍历
records.forEach(record ->{
String key = record.key();
String value = record.value();
System.out.println("消息接收key成功" + key + "消息接收value成功" + value);
});
}
}
}

View File

@ -0,0 +1,28 @@
package com.couplet.msg.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/1 20:07
* @description
*/
public class KafkaProducerQuickStart {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"8.130.181.16:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("couplet-kafka", "key", "helloWord");
kafkaProducer.send(producerRecord);
kafkaProducer.close();
}
}

View File

@ -0,0 +1,37 @@
package com.couplet.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author DongXiaoDong
* @version 1.0
* @date 2024/4/1 11:12
* @description
*/
public class KafkaProducerQuickStart {
public static void main(String[] args) {
// 发送消息Kafka
// 用来配置kafka 消息生产者对象的配置信息
Properties properties = new Properties();
//配置host
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "39.103.129.53:9092");
//配置键值的序列方式
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//创建消息生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//发送消息
//创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>("couplet-kafka", "key", "helloWord");
kafkaProducer.send(record);
//关闭 kafkaProducer
kafkaProducer.close();
}
}

View File

@ -6,6 +6,8 @@ import com.couplet.common.core.annotation.Excel;
import lombok.*;
import lombok.experimental.SuperBuilder;
import javax.validation.constraints.NotEmpty;
/**
* @author DongXiaoDong
* @version 1.0
@ -23,12 +25,14 @@ public class CoupletTroubleCode {
*/
@TableId(value = "trouble_id",type = IdType.AUTO)
@Excel(name = "故障码主键", cellType = Excel.ColumnType.NUMERIC)
@NotEmpty(message = "故障码主键不能为空")
private Integer troubleId;
/**
*
*/
@Excel(name = "故障码")
@NotEmpty(message = "故障码不能为空")
private String troubleCode;
/**
@ -53,11 +57,13 @@ public class CoupletTroubleCode {
* Id
*/
@Excel(name = "故障类型Id")
@NotEmpty(message = "故障类型Id不能为空")
private Integer typeId;
/**
* Id
*/
@Excel(name = "故障等级Id")
@NotEmpty(message = "故障等级Id不能为空")
private Integer gradeId;
}