From 3ce339dbf97663b1de0ffe08489bd0325c7f6ad3 Mon Sep 17 00:00:00 2001 From: Liu Wu <2780205363@qq.com> Date: Thu, 26 Sep 2024 21:23:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E6=96=B0=E5=A2=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-modules/cloud-data/pom.xml | 112 +++++++++++++++ .../java/com/muyu/data/DataApplication.java | 13 ++ .../muyu/data/config/KafkaConsumerConfig.java | 129 ++++++++++++++++++ .../muyu/data/config/KafkaProviderConfig.java | 127 +++++++++++++++++ .../data/config/KafkaSendResultHandler.java | 65 +++++++++ .../CloudElectronicFenceApplication.java | 1 + cloud-modules/pom.xml | 1 + 7 files changed, 448 insertions(+) create mode 100644 cloud-modules/cloud-data/pom.xml create mode 100644 cloud-modules/cloud-data/src/main/java/com/muyu/data/DataApplication.java create mode 100644 cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaConsumerConfig.java create mode 100644 cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaProviderConfig.java create mode 100644 cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaSendResultHandler.java diff --git a/cloud-modules/cloud-data/pom.xml b/cloud-modules/cloud-data/pom.xml new file mode 100644 index 0000000..1a7cdb5 --- /dev/null +++ b/cloud-modules/cloud-data/pom.xml @@ -0,0 +1,112 @@ + + + 4.0.0 + + com.muyu + cloud-modules + 3.6.3 + + + cloud-data + + + 17 + 17 + UTF-8 + + + + + + org.springframework.cloud + spring-cloud-starter-bootstrap + 4.1.2 + + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + org.springframework.kafka + spring-kafka + + + + + + com.mysql + mysql-connector-j + + + + + com.muyu + cloud-common-datasource + + + + + com.muyu + cloud-common-datascope + + + + + com.muyu + cloud-common-log + + + + + com.muyu + cloud-common-api-doc + + + + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + diff --git a/cloud-modules/cloud-data/src/main/java/com/muyu/data/DataApplication.java b/cloud-modules/cloud-data/src/main/java/com/muyu/data/DataApplication.java new file mode 100644 index 0000000..8a5c578 --- /dev/null +++ b/cloud-modules/cloud-data/src/main/java/com/muyu/data/DataApplication.java @@ -0,0 +1,13 @@ +package com.muyu.data; + +import com.muyu.common.security.annotation.EnableMyFeignClients; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@EnableMyFeignClients +public class DataApplication { + public static void main(String[] args) { + SpringApplication.run(DataApplication.class,args); + } +} diff --git a/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaConsumerConfig.java b/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..094e7ed --- /dev/null +++ b/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaConsumerConfig.java @@ -0,0 +1,129 @@ +package com.muyu.data.config; + + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.support.serializer.JsonDeserializer; + + +import java.util.HashMap; +import java.util.Map; + +/** + * @author 徐一杰 + * @date 2022/10/31 18:05 + * kafka配置,也可以写在yml,这个文件会覆盖yml + */ +@SpringBootConfiguration +public class KafkaConsumerConfig { + + /** + * 配置 Kafka的 主机地址 + */ + @Value("${spring.kafka.consumer.bootstrap-servers}") + private String bootstrapServers; + /** + * 配置分分组 + */ + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + /** + * 是否自动提交 偏移量 + */ + @Value("${spring.kafka.consumer.enable-auto-commit}") + private boolean enableAutoCommit; + /** + * 消费者与Kafka的心跳续约的会话超时时间 + */ + @Value("${spring.kafka.properties.session.timeout.ms}") + private String sessionTimeout; + /** + * 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + */ + @Value("${spring.kafka.properties.max.poll.interval.ms}") + private String maxPollIntervalTime; + + @Value("${spring.kafka.consumer.max-poll-records}") + private String maxPollRecords; + + @Value("${spring.kafka.consumer.auto-offset-reset}") + private String autoOffsetReset; + + @Value("${spring.kafka.listener.concurrency}") + private Integer concurrency; + + @Value("${spring.kafka.listener.missing-topics-fatal}") + private boolean missingTopicsFatal; + + @Value("${spring.kafka.listener.poll-timeout}") + private long pollTimeout; + + @Bean + public Map consumerConfigs() { + Map propsMap = new HashMap<>(16); + propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 + propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + //自动提交的时间间隔,自动提交开启时生效 + propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); + //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 + //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) + //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); + //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 + //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, + //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, + //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 + //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 + //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 + propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); + //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s + propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); + //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + return propsMap; + } + + @Bean + public ConsumerFactory consumerFactory() { + // 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 + try (JsonDeserializer deserializer = new JsonDeserializer<>()) { + deserializer.trustedPackages("*"); + return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); + } + } + + /** + * kafka监听容器工厂 负责 从 Kafka的主题中 取出消息进行消费 可以设置消费者的配置 + * @return + */ + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 + factory.setConcurrency(concurrency); + // 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 + factory.setMissingTopicsFatal(missingTopicsFatal); + // 自动提交关闭,需要设置手动消息确认 + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.getContainerProperties().setPollTimeout(pollTimeout); + // 设置为批量监听,需要用List接收 + // factory.setBatchListener(true); + return factory; + } + +} diff --git a/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaProviderConfig.java b/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaProviderConfig.java new file mode 100644 index 0000000..1ccb0e8 --- /dev/null +++ b/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaProviderConfig.java @@ -0,0 +1,127 @@ +package com.muyu.data.config; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; + +import java.util.HashMap; +import java.util.Map; + +/** + * 主题生产者的配置类 + */ +@Configuration +public class KafkaProviderConfig { + + /** + * kafka 的主机地址 + */ + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + /** + * 配置 Kafka的事务 + */ + @Value("${spring.kafka.producer.transaction-id-prefix}") + private String transactionIdPrefix; + /** + * 发送确认机制 + */ + @Value("${spring.kafka.producer.acks}") + private String acks; + /** + * 发送重试 + */ + @Value("${spring.kafka.producer.retries}") + private String retries; + /** + * 发送消息的批次大小 + */ + @Value("${spring.kafka.producer.batch-size}") + private String batchSize; + /** + * 消息的缓冲区内存大小 + */ + @Value("${spring.kafka.producer.buffer-memory}") + private String bufferMemory; + + /** + * 设置 健的序列化方式 + */ + @Value("${spring.kafka.producer.key-serializer}") + private String keySerializer; + + /** + * 设置 值的序列化方式 + */ + @Value("${spring.kafka.producer.value-serializer}") + private String valueSerializer; + + /** + * 构建 map 配置消息生产者对象的配置 + * @return + */ + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(16); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + //开启事务必须设为all + props.put(ProducerConfig.ACKS_CONFIG, acks); + //发生错误后,消息重发的次数,开启事务必须大于0 + props.put(ProducerConfig.RETRIES_CONFIG, retries); + //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 + //批次的大小可以通过batch.size 参数设置.默认是16KB + //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 + //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟 + //实测batchSize这个参数没有用 + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, + //即使数据没达到16KB,也将这个批次发送出去 + props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); + //生产者内存缓冲区的大小 + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + //反序列化,和生产者的序列化方式对应 + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + return props; + } + + /** + * 构建 主题生产者工厂 + * @return + */ + @Bean + public ProducerFactory producerFactory() { + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); + //开启事务,会导致 LINGER_MS_CONFIG 配置失效 + factory.setTransactionIdPrefix(transactionIdPrefix); + return factory; + } + + /** + * 配置 Kafka的事务管理器 + * @param producerFactory + * @return + */ + @Bean + public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { + return new KafkaTransactionManager<>(producerFactory); + } + + /** + * 构建 KafkaTemplate + * @return + */ + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + +} diff --git a/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaSendResultHandler.java b/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaSendResultHandler.java new file mode 100644 index 0000000..650f07a --- /dev/null +++ b/cloud-modules/cloud-data/src/main/java/com/muyu/data/config/KafkaSendResultHandler.java @@ -0,0 +1,65 @@ +package com.muyu.data.config; + +import jakarta.annotation.Nullable; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Autowired + private KafkaTemplate kafkaTemplate; + + /** + * bean 初始化方法 + */ + @PostConstruct + public void init(){ + this.kafkaTemplate.setProducerListener(this); + } + + /** + * 消息发送到Kafka成功的回调 + * @param producerRecord + * @param recordMetadata + */ + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata){ + System.out.println("信息发送成功:"+ producerRecord.toString()); + } + + /** + * 息发送到 Kafka 失败的回调 + * @param producerRecord the failed record + * @param recordMetadata The metadata for the record that was sent (i.e. the partition + * and offset). If an error occurred, metadata will contain only valid topic and maybe + * the partition. If the partition is not provided in the ProducerRecord and an error + * occurs before partition is assigned, then the partition will be set to + * RecordMetadata.UNKNOWN_PARTITION. + * @param exception the exception thrown + */ + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, + Exception exception){ + System.out.println("消息发送失败: "+ producerRecord.toString()); + } + + + + + + + + + + + + + +} diff --git a/cloud-modules/cloud-modules-fence/src/main/java/com/muyu/fence/CloudElectronicFenceApplication.java b/cloud-modules/cloud-modules-fence/src/main/java/com/muyu/fence/CloudElectronicFenceApplication.java index dab43ff..e6c6970 100644 --- a/cloud-modules/cloud-modules-fence/src/main/java/com/muyu/fence/CloudElectronicFenceApplication.java +++ b/cloud-modules/cloud-modules-fence/src/main/java/com/muyu/fence/CloudElectronicFenceApplication.java @@ -22,6 +22,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; DruidDataSourceAutoConfigure.class, DynamicDataSourceAutoConfiguration.class }) + public class CloudElectronicFenceApplication { public static void main (String[] args) { SpringApplication.run(CloudElectronicFenceApplication.class, args); diff --git a/cloud-modules/pom.xml b/cloud-modules/pom.xml index 607edae..c49bd52 100644 --- a/cloud-modules/pom.xml +++ b/cloud-modules/pom.xml @@ -19,6 +19,7 @@ cloud-modules-breakdown cloud-modules-warn cloud-saas + cloud-data cloud-modules