From e6e7e58f953d943c46ac7b18ca85e4010c2ca3bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E6=AC=A3=E6=82=A6?= <2289014031@qq.com> Date: Mon, 7 Oct 2024 14:22:46 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E5=86=97=E4=BD=99=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E5=B9=B6=E4=BC=98=E5=8C=96Kafka=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 删除了KafkaConstants、KafkaConsumerConfig和KafkaProviderConfig类,这些类已被移动到其他项目中 -简化了KafkaConfig类,移除了其中的kafkaProducer方法,该方法已被移动到其他类中 - 在MqttTest类中更新了Kafka主题的硬编码值 - 移除了oneMse类,该类用于测试,目前不再需要 - 在ProtocolApplication的main方法中添加了一个空的System.out.println语句,用于调试或日志记录 --- .../kafka/config/KafkaConsumerConfig.java | 54 -------------- .../kafka/config/KafkaProviderConfig.java | 45 ------------ .../kafka/constants/KafkaConstants.java | 9 --- .../com/muyu/kafkaconfig/KafkaConfig.java | 3 +- .../java/com/muyu/ProtocolApplication.java | 4 + .../main/java/com/muyu/cache/bean/oneMse.java | 73 ------------------- .../src/main/java/com/muyu/mqtt/MqttTest.java | 4 +- 7 files changed, 8 insertions(+), 184 deletions(-) delete mode 100644 cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java delete mode 100644 cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java delete mode 100644 cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java delete mode 100644 cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/oneMse.java diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java deleted file mode 100644 index 8055e42..0000000 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.muyu.common.kafka.config; - -import com.muyu.common.kafka.constants.KafkaConstants; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.HashMap; -import java.util.Map; - -/** - * kafka 消息的消费者 配置类 - */ -@Configuration -public class KafkaConsumerConfig { - - @Bean - public KafkaConsumer kafkaConsumer() { - Map configs = new HashMap<>(); - //kafka服务端的IP和端口,格式:(ip:port) - configs.put("bootstrap.servers", "60.204.221.52:9092"); - //开启consumer的偏移量(offset)自动提交到Kafka - configs.put("enable.auto.commit", true); - //consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒 - configs.put("auto.commit.interval", 5000); - //在Kafka中没有初始化偏移量或者当前偏移量不存在情况 - //earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量 - //latest, 在偏移量无效的情况下, 自动重置为最新的偏移量 - //none, 在偏移量无效的情况下, 抛出异常. - configs.put("auto.offset.reset", "latest"); - //请求阻塞的最大时间(毫秒) - configs.put("fetch.max.wait", 500); - //请求应答的最小字节数 - configs.put("fetch.min.size", 1); - //心跳间隔时间(毫秒) - configs.put("heartbeat-interval", 3000); - //一次调用poll返回的最大记录条数 - configs.put("max.poll.records", 500); - //指定消费组 - configs.put("group.id", KafkaConstants.KafkaGrop); - //指定key使用的反序列化类 - Deserializer keyDeserializer = new StringDeserializer(); - //指定value使用的反序列化类 - Deserializer valueDeserializer = new StringDeserializer(); - //创建Kafka消费者 - KafkaConsumer kafkaConsumer = new KafkaConsumer(configs, keyDeserializer, valueDeserializer); - return kafkaConsumer; - } - -} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java deleted file mode 100644 index 07b56d3..0000000 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProviderConfig.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.muyu.common.kafka.config; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.HashMap; -import java.util.Map; - -/** - * kafka 消息的生产者 配置类 - */ -@Configuration -public class KafkaProviderConfig { - - @Bean - public KafkaProducer kafkaProducer() { - Map configs = new HashMap<>(); - //#kafka服务端的IP和端口,格式:(ip:port) - configs.put("bootstrap.servers", "47.116.173.119:9092"); - //客户端发送服务端失败的重试次数 - configs.put("retries", 2); - //多个记录被发送到同一个分区时,生产者将尝试将记录一起批处理成更少的请求. - //此设置有助于提高客户端和服务器的性能,配置控制默认批量大小(以字节为单位) - configs.put("batch.size", 16384); - //生产者可用于缓冲等待发送到服务器的记录的总内存字节数(以字节为单位) - configs.put("buffer-memory", 33554432); - //生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化 - //acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送.在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1. - //acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,在follower节点复制前,leader节点确认记录后立即失败的话,记录将会丢失. - //acks=all,acks=-1,leader节点将等待所有同步复制副本完成再确认记录,这保证了只要至少有一个同步复制副本存活,记录就不会丢失. - configs.put("acks", "-1"); - //指定key使用的序列化类 - Serializer keySerializer = new StringSerializer(); - //指定value使用的序列化类 - Serializer valueSerializer = new StringSerializer(); - //创建Kafka生产者 - KafkaProducer kafkaProducer = new KafkaProducer(configs, keySerializer, valueSerializer); - return kafkaProducer; - } - -} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java deleted file mode 100644 index b1b7180..0000000 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.muyu.common.kafka.constants; - - -public class KafkaConstants { - - public final static String KafkaTopic = "carJsons"; - -// public final static String KafkaGrop = "kafka_grop"; -} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/kafkaconfig/KafkaConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/kafkaconfig/KafkaConfig.java index 393ccbb..6ea8be4 100644 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/kafkaconfig/KafkaConfig.java +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/kafkaconfig/KafkaConfig.java @@ -17,7 +17,8 @@ import java.util.HashMap; * @Date:2024/9/28 12:19 */ @Configuration -public class KafkaConfig { +public class +KafkaConfig { @Bean public KafkaProducer kafkaProducer(){ diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/ProtocolApplication.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/ProtocolApplication.java index ce6cf59..2486fe9 100644 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/ProtocolApplication.java +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/ProtocolApplication.java @@ -1,5 +1,6 @@ package com.muyu; +import cn.hutool.json.JSONUtil; import com.muyu.common.security.annotation.EnableMyFeignClients; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; @@ -9,7 +10,10 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @EnableMyFeignClients @SpringBootApplication public class ProtocolApplication { + public static void main(String[] args) { + System.out.println(""); SpringApplication.run(ProtocolApplication.class, args); + } } diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/oneMse.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/oneMse.java deleted file mode 100644 index da479ba..0000000 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/cache/bean/oneMse.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.muyu.cache.bean; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@AllArgsConstructor -@NoArgsConstructor -@Builder -public class oneMse { - - - /** - * 测试类 - */ - private Long numther; - /** - * 测试类 - */ - private Integer numthonre; - /** - * 测试类 - */ - private Integer numtweh; - /** - * 测试类 - */ - private String numtrwh; - /** - * 测试类 - */ - private Integer numrrth; - /** - * 测试类 - */ - private Integer numereth; - /** - * 测试类 - */ - private String numth; - /** - * 测试类 - */ - private Integer numttruh; - /** - * 测试类 - */ - private Integer numtrert; - /** - * 测试类 - */ - private Integer erg; - /** - * 测试类 - */ - private Integer numtgreh; - /** - * 测试类 - */ - private Integer rtetg; - /** - * 测试类 - */ - private Integer geewr; - /** - * 测试类 - */ - private Integer heertherh; - - -} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java index 01ab058..1e49f43 100644 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java @@ -1,7 +1,7 @@ package com.muyu.mqtt; import com.alibaba.fastjson2.JSONObject; -import com.muyu.common.kafka.constants.KafkaConstants; + import com.muyu.domain.CarMessage; import com.muyu.domain.KafKaData; @@ -88,7 +88,7 @@ public class MqttTest { .build()); } String jsonString = JSONObject.toJSONString(kafKaDataList); - ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); + ProducerRecord producerRecord = new ProducerRecord<>("carJsons",jsonString); kafkaProducer.send(producerRecord); log.info("kafka投产:{}", jsonString);