From 263aaaa18b6b152f7b17ab360a29c4308d4ec2ec Mon Sep 17 00:00:00 2001 From: fst1996 <2411194573@qq.com> Date: Tue, 28 Nov 2023 22:13:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 2 +- pom.xml | 2 +- .../god/kafka/config/KafkaProducerConfig.java | 41 ++++++++++ .../god/kafka/config/KafkaProviderConfig.java | 81 ------------------- .../kafka/config/KafkaSendResultHandler.java | 25 ++++++ .../com/god/kafka/contents/KafkaContent.java | 4 +- .../com/god/kafka/producer/ProducerTest.java | 69 ---------------- .../java/com/god/mqtt/config/MqttProper.java | 5 +- .../com/god/mqtt/service/MqttService.java | 2 +- src/main/resources/application.yml | 3 +- 10 files changed, 74 insertions(+), 160 deletions(-) create mode 100644 src/main/java/com/god/kafka/config/KafkaProducerConfig.java delete mode 100644 src/main/java/com/god/kafka/config/KafkaProviderConfig.java create mode 100644 src/main/java/com/god/kafka/config/KafkaSendResultHandler.java delete mode 100644 src/main/java/com/god/kafka/producer/ProducerTest.java diff --git a/Dockerfile b/Dockerfile index b75f75f..bbc9968 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ EXPOSE 9999 VOLUME /home/logs/god-data-server # 复制jar文件到docker内部 -COPY /target/mqtt-kafkademo-1.0-SNAPSHOT.jar /home/app.jar +COPY /target/mqtt-kafka-demo-1.0-SNAPSHOT.jar /home/app.jar #工作目录 exec -it 进来默认就是这个目 WORKDIR /home diff --git a/pom.xml b/pom.xml index ef51b55..d636807 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.god - mqtt-kafkademo + mqtt-kafka-demo 1.0-SNAPSHOT diff --git a/src/main/java/com/god/kafka/config/KafkaProducerConfig.java b/src/main/java/com/god/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..3756f56 --- /dev/null +++ b/src/main/java/com/god/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,41 @@ +package com.god.kafka.config; + + + +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; + + +/** + * kafka生产者配置 + * @author fst + * @date 2023/11/25 13:23 + */ +@Configuration +@Log4j2 +public class KafkaProducerConfig { + + @Bean + public Producer producerInit() { + + // 创建一个KafkaProducer的配置对象 + Properties properties = new Properties(); + // 设置Kafka服务器的地址 + properties.put("bootstrap.servers", "10.100.1.8:9092"); + // 设置消息的key和value的序列化方式为StringSerializer + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner"); + // 输出日志信息 + log.info("你好kafka"); + // 返回一个使用给定配置对象创建的KafkaProducer实例 + return new KafkaProducer<>(properties); + } + + +} diff --git a/src/main/java/com/god/kafka/config/KafkaProviderConfig.java b/src/main/java/com/god/kafka/config/KafkaProviderConfig.java deleted file mode 100644 index 1b10ee9..0000000 --- a/src/main/java/com/god/kafka/config/KafkaProviderConfig.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.god.kafka.config; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -/** - * kafka 消息的提供者 配置类 - */ -@Data -@Builder -@Configuration -@NoArgsConstructor -@AllArgsConstructor -@ConfigurationProperties(prefix = "spring.kafka.producer") -public class KafkaProviderConfig { - - private String bootstrapServers; - - private String transactionIdPrefix; - - private String acks; - - private String retries; - - private String batchSize; - - private String bufferMemory; - - private String keySerializer; - - private String valueSerializer; - - /** - * 构建了 Map 存放了 Kafka 生产者的 配置信息 - * @return - */ - @Bean - public KafkaProducer producerConfigs() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 - //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 - //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 - //开启事务必须设为all - props.put(ProducerConfig.ACKS_CONFIG, acks); - //发生错误后,消息重发的次数,开启事务必须大于0 - props.put(ProducerConfig.RETRIES_CONFIG, retries); - //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 - //批次的大小可以通过batch.size 参数设置.默认是16KB - //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 - //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟 - //实测batchSize这个参数没有用 - props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); - //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, - //即使数据没达到16KB,也将这个批次发送出去 - props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); - //生产者内存缓冲区的大小 - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); - //反序列化,和生产者的序列化方式对应 - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); - props.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner"); - return new KafkaProducer<>(props); - } - - -} diff --git a/src/main/java/com/god/kafka/config/KafkaSendResultHandler.java b/src/main/java/com/god/kafka/config/KafkaSendResultHandler.java new file mode 100644 index 0000000..1f03cac --- /dev/null +++ b/src/main/java/com/god/kafka/config/KafkaSendResultHandler.java @@ -0,0 +1,25 @@ +package com.god.kafka.config; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.lang.Nullable; +import org.springframework.stereotype.Component; + +/** + * @author fst + * @date 2022/10/31 15:41 + * kafka消息发送回调处理 + */ +@Component +public class KafkaSendResultHandler implements ProducerListener { + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/src/main/java/com/god/kafka/contents/KafkaContent.java b/src/main/java/com/god/kafka/contents/KafkaContent.java index 44a0097..d91c4e5 100644 --- a/src/main/java/com/god/kafka/contents/KafkaContent.java +++ b/src/main/java/com/god/kafka/contents/KafkaContent.java @@ -7,7 +7,7 @@ package com.god.kafka.contents; */ public class KafkaContent { - public static final String TOPIC = "top"; + public static final String TOPIC = "test"; - public static final String KAFKA_CON = "117.72.43.22:9092"; + public static final String KAFKA_CON = "10.100.1.8:9092"; } diff --git a/src/main/java/com/god/kafka/producer/ProducerTest.java b/src/main/java/com/god/kafka/producer/ProducerTest.java deleted file mode 100644 index bcefb81..0000000 --- a/src/main/java/com/god/kafka/producer/ProducerTest.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.god.kafka.producer; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; - -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import static com.god.kafka.contents.KafkaContent.KAFKA_CON; -import static com.god.kafka.contents.KafkaContent.TOPIC; - - -/** - * @author DongZl - * @description: 生产者测试 - * @Date 2023/8/25 18:50 - */ -public class ProducerTest { - private static Producer producer; - - public static void KfkProducer() { - Properties props = new Properties(); - props.put("bootstrap.servers", KAFKA_CON); -// props.put("linger.ms", 1); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producer = new KafkaProducer<>(props); - } - - private static void close() { - producer.close(); - } - - private static RecordMetadata send(String topic, String key, String value) { - Future result = producer.send(new ProducerRecord<>(topic, key, value)); - RecordMetadata meta = null; - try { - meta = result.get(); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } - return meta; - } - - public static void main (String[] args) { - KfkProducer(); - - new Thread(() -> { - int i = 0; - do { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); - send(TOPIC, UUID.randomUUID().toString(), format); - }while (i++ < 1000); - close(); - }).start(); - - } -} diff --git a/src/main/java/com/god/mqtt/config/MqttProper.java b/src/main/java/com/god/mqtt/config/MqttProper.java index 3b9bab3..748571f 100644 --- a/src/main/java/com/god/mqtt/config/MqttProper.java +++ b/src/main/java/com/god/mqtt/config/MqttProper.java @@ -8,9 +8,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** - * @author DongZl - * @description: mqtt配置 - * @Date 2023-11-24 下午 02:04 + * 读取mqtt配置 + * @author fst */ @Data @Builder diff --git a/src/main/java/com/god/mqtt/service/MqttService.java b/src/main/java/com/god/mqtt/service/MqttService.java index 8c0be5c..fe0317f 100644 --- a/src/main/java/com/god/mqtt/service/MqttService.java +++ b/src/main/java/com/god/mqtt/service/MqttService.java @@ -49,7 +49,7 @@ public class MqttService implements MqttCallback { public void messageArrived(String topic, MqttMessage message) throws Exception { String msg = new String(message.getPayload()); log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg); - kafkaProducerService.send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), msg); + kafkaProducerService.send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), "如果是这个错,我一头撞死"); } @Override diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 40b7576..248a6dd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ mqtt: config: - broker: tcp://47.115.225.229:1883 + broker: tcp://10.100.1.8:1883 topic: test # Tomcat @@ -22,7 +22,6 @@ spring: bootstrap-servers: 10.100.1.8:9092 transaction-id-prefix: kafkaTx- retries: 3 - # acks=0 : 生产者成功写入消息不会等待任何来自服务器的相应 # acks=1 : 只要集群的master收到消息,生产者就会收到一个来自服务器的响应。 # acks=all : 只有当所有参与者的复制节点全部收到消息时,生产者才会收到一个来自服务器的响应