master
fst1996 2023-11-28 22:13:45 +08:00
parent 4ab4de8a9a
commit 263aaaa18b
10 changed files with 74 additions and 160 deletions

View File

@ -8,7 +8,7 @@ EXPOSE 9999
VOLUME /home/logs/god-data-server VOLUME /home/logs/god-data-server
# 复制jar文件到docker内部 # 复制jar文件到docker内部
COPY /target/mqtt-kafkademo-1.0-SNAPSHOT.jar /home/app.jar COPY /target/mqtt-kafka-demo-1.0-SNAPSHOT.jar /home/app.jar
#工作目录 exec -it 进来默认就是这个目 #工作目录 exec -it 进来默认就是这个目
WORKDIR /home WORKDIR /home

View File

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.god</groupId> <groupId>com.god</groupId>
<artifactId>mqtt-kafkademo</artifactId> <artifactId>mqtt-kafka-demo</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
<properties> <properties>

View File

@ -0,0 +1,41 @@
package com.god.kafka.config;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* kafka
* @author fst
* @date 2023/11/25 13:23
*/
@Configuration
@Log4j2
public class KafkaProducerConfig {
@Bean
public Producer<String, String> producerInit() {
// 创建一个KafkaProducer的配置对象
Properties properties = new Properties();
// 设置Kafka服务器的地址
properties.put("bootstrap.servers", "10.100.1.8:9092");
// 设置消息的key和value的序列化方式为StringSerializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner");
// 输出日志信息
log.info("你好kafka");
// 返回一个使用给定配置对象创建的KafkaProducer实例
return new KafkaProducer<>(properties);
}
}

View File

@ -1,81 +0,0 @@
package com.god.kafka.config;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* kafka
*/
@Data
@Builder
@Configuration
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "spring.kafka.producer")
public class KafkaProviderConfig {
private String bootstrapServers;
private String transactionIdPrefix;
private String acks;
private String retries;
private String batchSize;
private String bufferMemory;
private String keySerializer;
private String valueSerializer;
/**
* Map Kafka
* @return
*/
@Bean
public KafkaProducer<String, String> producerConfigs() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
//acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
//开启事务必须设为all
props.put(ProducerConfig.ACKS_CONFIG, acks);
//发生错误后消息重发的次数开启事务必须大于0
props.put(ProducerConfig.RETRIES_CONFIG, retries);
//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
//批次的大小可以通过batch.size 参数设置.默认是16KB
//较小的批次大小有可能降低吞吐量批次大小为0则完全禁用批处理
//比如说kafka里的消息5秒钟Batch才凑满了16KB才能发送出去。那这些消息的延迟就是5秒钟
//实测batchSize这个参数没有用
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
//即使数据没达到16KB,也将这个批次发送出去
props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
//生产者内存缓冲区的大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//反序列化,和生产者的序列化方式对应
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner");
return new KafkaProducer<>(props);
}
}

View File

@ -0,0 +1,25 @@
package com.god.kafka.config;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
/**
* @author fst
* @date 2022/10/31 15:41
* kafka
*/
@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
System.out.println("消息发送成功:" + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());
}
}

View File

@ -7,7 +7,7 @@ package com.god.kafka.contents;
*/ */
public class KafkaContent { public class KafkaContent {
public static final String TOPIC = "top"; public static final String TOPIC = "test";
public static final String KAFKA_CON = "117.72.43.22:9092"; public static final String KAFKA_CON = "10.100.1.8:9092";
} }

View File

@ -1,69 +0,0 @@
package com.god.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.god.kafka.contents.KafkaContent.KAFKA_CON;
import static com.god.kafka.contents.KafkaContent.TOPIC;
/**
* @author DongZl
* @description:
* @Date 2023/8/25 18:50
*/
public class ProducerTest {
private static Producer<String, String> producer;
public static void KfkProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
// props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
private static void close() {
producer.close();
}
private static RecordMetadata send(String topic, String key, String value) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
RecordMetadata meta = null;
try {
meta = result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return meta;
}
public static void main (String[] args) {
KfkProducer();
new Thread(() -> {
int i = 0;
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
send(TOPIC, UUID.randomUUID().toString(), format);
}while (i++ < 1000);
close();
}).start();
}
}

View File

@ -8,9 +8,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
/** /**
* @author DongZl * mqtt
* @description: mqtt * @author fst
* @Date 2023-11-24 02:04
*/ */
@Data @Data
@Builder @Builder

View File

@ -49,7 +49,7 @@ public class MqttService implements MqttCallback {
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload()); String msg = new String(message.getPayload());
log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg); log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg);
kafkaProducerService.send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), msg); kafkaProducerService.send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), "如果是这个错,我一头撞死");
} }
@Override @Override

View File

@ -1,6 +1,6 @@
mqtt: mqtt:
config: config:
broker: tcp://47.115.225.229:1883 broker: tcp://10.100.1.8:1883
topic: test topic: test
# Tomcat # Tomcat
@ -22,7 +22,6 @@ spring:
bootstrap-servers: 10.100.1.8:9092 bootstrap-servers: 10.100.1.8:9092
transaction-id-prefix: kafkaTx- transaction-id-prefix: kafkaTx-
retries: 3 retries: 3
# acks=0 : 生产者成功写入消息不会等待任何来自服务器的相应 # acks=0 : 生产者成功写入消息不会等待任何来自服务器的相应
# acks=1 : 只要集群的master收到消息生产者就会收到一个来自服务器的响应。 # acks=1 : 只要集群的master收到消息生产者就会收到一个来自服务器的响应。
# acks=all : 只有当所有参与者的复制节点全部收到消息时,生产者才会收到一个来自服务器的响应 # acks=all : 只有当所有参与者的复制节点全部收到消息时,生产者才会收到一个来自服务器的响应