diff --git a/Dockerfile b/Dockerfile
index b75f75f..bbc9968 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -8,7 +8,7 @@ EXPOSE 9999
VOLUME /home/logs/god-data-server
# 复制jar文件到docker内部
-COPY /target/mqtt-kafkademo-1.0-SNAPSHOT.jar /home/app.jar
+COPY /target/mqtt-kafka-demo-1.0-SNAPSHOT.jar /home/app.jar
#工作目录 exec -it 进来默认就是这个目
WORKDIR /home
diff --git a/pom.xml b/pom.xml
index ef51b55..d636807 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.god
- mqtt-kafkademo
+ mqtt-kafka-demo
1.0-SNAPSHOT
diff --git a/src/main/java/com/god/kafka/config/KafkaProducerConfig.java b/src/main/java/com/god/kafka/config/KafkaProducerConfig.java
new file mode 100644
index 0000000..3756f56
--- /dev/null
+++ b/src/main/java/com/god/kafka/config/KafkaProducerConfig.java
@@ -0,0 +1,41 @@
+package com.god.kafka.config;
+
+
+
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Properties;
+
+
+/**
+ * kafka生产者配置
+ * @author fst
+ * @date 2023/11/25 13:23
+ */
+@Configuration
+@Log4j2
+public class KafkaProducerConfig {
+
+ @Bean
+ public Producer producerInit() {
+
+ // 创建一个KafkaProducer的配置对象
+ Properties properties = new Properties();
+ // 设置Kafka服务器的地址
+ properties.put("bootstrap.servers", "10.100.1.8:9092");
+ // 设置消息的key和value的序列化方式为StringSerializer
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner");
+ // 输出日志信息
+ log.info("你好kafka");
+ // 返回一个使用给定配置对象创建的KafkaProducer实例
+ return new KafkaProducer<>(properties);
+ }
+
+
+}
diff --git a/src/main/java/com/god/kafka/config/KafkaProviderConfig.java b/src/main/java/com/god/kafka/config/KafkaProviderConfig.java
deleted file mode 100644
index 1b10ee9..0000000
--- a/src/main/java/com/god/kafka/config/KafkaProviderConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.god.kafka.config;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.SpringBootConfiguration;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * kafka 消息的提供者 配置类
- */
-@Data
-@Builder
-@Configuration
-@NoArgsConstructor
-@AllArgsConstructor
-@ConfigurationProperties(prefix = "spring.kafka.producer")
-public class KafkaProviderConfig {
-
- private String bootstrapServers;
-
- private String transactionIdPrefix;
-
- private String acks;
-
- private String retries;
-
- private String batchSize;
-
- private String bufferMemory;
-
- private String keySerializer;
-
- private String valueSerializer;
-
- /**
- * 构建了 Map 存放了 Kafka 生产者的 配置信息
- * @return
- */
- @Bean
- public KafkaProducer producerConfigs() {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
- //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
- //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
- //开启事务必须设为all
- props.put(ProducerConfig.ACKS_CONFIG, acks);
- //发生错误后,消息重发的次数,开启事务必须大于0
- props.put(ProducerConfig.RETRIES_CONFIG, retries);
- //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
- //批次的大小可以通过batch.size 参数设置.默认是16KB
- //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
- //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
- //实测batchSize这个参数没有用
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
- //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
- //即使数据没达到16KB,也将这个批次发送出去
- props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
- //生产者内存缓冲区的大小
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
- //反序列化,和生产者的序列化方式对应
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
- props.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner");
- return new KafkaProducer<>(props);
- }
-
-
-}
diff --git a/src/main/java/com/god/kafka/config/KafkaSendResultHandler.java b/src/main/java/com/god/kafka/config/KafkaSendResultHandler.java
new file mode 100644
index 0000000..1f03cac
--- /dev/null
+++ b/src/main/java/com/god/kafka/config/KafkaSendResultHandler.java
@@ -0,0 +1,25 @@
+package com.god.kafka.config;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.kafka.support.ProducerListener;
+import org.springframework.lang.Nullable;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author fst
+ * @date 2022/10/31 15:41
+ * kafka消息发送回调处理
+ */
+@Component
+public class KafkaSendResultHandler implements ProducerListener