From c99d1e0e89d41a48e5a8e5224805e66856f228b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=89=E5=AE=89=E5=90=9B?= <2746727141@qq.com> Date: Mon, 8 Apr 2024 10:35:25 +0800 Subject: [PATCH] =?UTF-8?q?commit=20=E5=90=88=E5=B9=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 8 + zhilian-auth/src/main/resources/bootstrap.yml | 2 - .../src/main/resources/bootstrap.yml | 3 - .../src/main/resources/bootstrap.yml | 2 - .../src/main/resources/bootstrap.yml | 2 - .../src/main/resources/bootstrap.yml | 2 - .../src/main/resources/bootstrap.yml | 2 - .../src/main/resources/bootstrap.yml | 2 - zhilian-modules/zhilian-online/pom.xml | 6 + .../online/ZhiLianOnlineApplication.java | 9 +- .../src/main/java/com/zhilian/online/aa.java | 4 - .../online/config/KafkaConsumerConfig.java | 164 ++++++++++++++++++ .../online/config/KafkaProviderConfig.java | 144 +++++++++++++++ .../zhilian/online/config/MqttxConfig.java | 89 +++++++++- .../zhilian/online/config/RabbitConfig.java | 1 - .../online/constans/OnlineConstants.java | 2 + .../online/consumer/KafkaConsumer.java | 28 +++ .../OnlineLoadCenterController.java | 2 - .../impl/OnlineLoadCenterServiceImpl.java | 2 +- .../zhilian/online/uitls/MessageResolver.java | 59 +++++++ .../src/main/resources/bootstrap.yml | 2 - .../java/com/zhilian/online/TestKafka.java | 30 ++++ .../resolver/ZhiLianResolverApplication.java | 18 -- .../resolver/ZhilianResolverApplication.java | 18 -- .../src/main/resources/bootstrap.yml | 2 - .../src/main/resources/bootstrap.yml | 2 - .../src/main/resources/bootstrap.yml | 2 - 27 files changed, 538 insertions(+), 69 deletions(-) delete mode 100644 zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/aa.java create mode 100644 zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaConsumerConfig.java create mode 100644 zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaProviderConfig.java create mode 100644 zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java create mode 100644 zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/uitls/MessageResolver.java create mode 100644 zhilian-modules/zhilian-online/src/test/java/com/zhilian/online/TestKafka.java delete mode 100644 zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhiLianResolverApplication.java delete mode 100644 zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhilianResolverApplication.java diff --git a/pom.xml b/pom.xml index 0eef05b..b63407f 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ 2.14.3 3.8.1 1.2.5 + 2.8.0 @@ -250,6 +251,13 @@ ${mqttv3.version} + + + org.springframework.kafka + spring-kafka + ${spring-kafka.version} + + com.zhilian zhilian-common-business diff --git a/zhilian-auth/src/main/resources/bootstrap.yml b/zhilian-auth/src/main/resources/bootstrap.yml index f0725a8..94f1a14 100644 --- a/zhilian-auth/src/main/resources/bootstrap.yml +++ b/zhilian-auth/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-gateway/src/main/resources/bootstrap.yml b/zhilian-gateway/src/main/resources/bootstrap.yml index 8248701..b426d9a 100644 --- a/zhilian-gateway/src/main/resources/bootstrap.yml +++ b/zhilian-gateway/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 @@ -36,7 +34,6 @@ spring: ds1: nacos: server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe dataId: sentinel-zhilian-gateway groupId: DEFAULT_GROUP data-type: json diff --git a/zhilian-modules/zhilian-business/src/main/resources/bootstrap.yml b/zhilian-modules/zhilian-business/src/main/resources/bootstrap.yml index 1105c2a..2339518 100644 --- a/zhilian-modules/zhilian-business/src/main/resources/bootstrap.yml +++ b/zhilian-modules/zhilian-business/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-modules/zhilian-file/src/main/resources/bootstrap.yml b/zhilian-modules/zhilian-file/src/main/resources/bootstrap.yml index 9522161..415c871 100644 --- a/zhilian-modules/zhilian-file/src/main/resources/bootstrap.yml +++ b/zhilian-modules/zhilian-file/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-modules/zhilian-gen/src/main/resources/bootstrap.yml b/zhilian-modules/zhilian-gen/src/main/resources/bootstrap.yml index b9c8fb0..e6ae61a 100644 --- a/zhilian-modules/zhilian-gen/src/main/resources/bootstrap.yml +++ b/zhilian-modules/zhilian-gen/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-modules/zhilian-job/src/main/resources/bootstrap.yml b/zhilian-modules/zhilian-job/src/main/resources/bootstrap.yml index 510d6bb..1833882 100644 --- a/zhilian-modules/zhilian-job/src/main/resources/bootstrap.yml +++ b/zhilian-modules/zhilian-job/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-modules/zhilian-manager/src/main/resources/bootstrap.yml b/zhilian-modules/zhilian-manager/src/main/resources/bootstrap.yml index 9fc2b73..28c6815 100644 --- a/zhilian-modules/zhilian-manager/src/main/resources/bootstrap.yml +++ b/zhilian-modules/zhilian-manager/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-modules/zhilian-online/pom.xml b/zhilian-modules/zhilian-online/pom.xml index 9f3e6f5..55aab47 100644 --- a/zhilian-modules/zhilian-online/pom.xml +++ b/zhilian-modules/zhilian-online/pom.xml @@ -92,6 +92,12 @@ org.eclipse.paho.client.mqttv3 + + + org.springframework.kafka + spring-kafka + + diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java index db07c36..67c0c70 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/ZhiLianOnlineApplication.java @@ -3,8 +3,12 @@ package com.zhilian.online; import com.zhilian.common.security.annotation.EnableCustomConfig; import com.zhilian.common.security.annotation.EnableMyFeignClients; import com.zhilian.common.swagger.annotation.EnableCustomSwagger2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.core.KafkaTemplate; /** * @version: @@ -16,8 +20,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @EnableCustomSwagger2 @EnableMyFeignClients @SpringBootApplication -public class ZhiLianOnlineApplication { +public class ZhiLianOnlineApplication{ public static void main(String[] args) { SpringApplication.run(ZhiLianOnlineApplication.class,args); } + + + } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/aa.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/aa.java deleted file mode 100644 index b7e6e8d..0000000 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/aa.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.zhilian.online; - -public class aa { -} diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaConsumerConfig.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..b9152df --- /dev/null +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaConsumerConfig.java @@ -0,0 +1,164 @@ +//package com.zhilian.online.config; +// +//import com.fasterxml.jackson.databind.JsonDeserializer; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.config.KafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +//import org.springframework.kafka.listener.ContainerProperties; +// +//import java.util.HashMap; +//import java.util.Map; +// +///** +// * @BelongsProject: smart-cloud-server +// * @BelongsPackage: com.zhilian.online.config +// * @Author: LiYuan +// * @CreateTime: 2024-04-02 20:01 +// * @Description: kafka消费者配置类 +// * @Version: 1.0 +// */ +//@Configuration +//public class KafkaConsumerConfig { +// +// /** +// * kafka服务器地址 +// */ +// @Value("${spring.kafka.consumer.bootstrap-servers}") +// private String bootstrapServers; +// +// /** +// * 消费者组 +// */ +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// +// /** +// * 是否自动提交 +// */ +// @Value("${spring.kafka.consumer.enable-auto-commit}") +// private boolean enableAutoCommit; +// +// /** +// * session超时时间 +// */ +// @Value("${spring.kafka.consumer.session.timeout.ms}") +// private String sessionTimeout; +// +// /** +// * 最大拉取时间 +// */ +// @Value("${spring.kafka.consumer.max.poll.interval.ms}") +// private String maxPollIntervalTime; +// +// /** +// * 每次拉取最大条数 +// */ +// @Value("${spring.kafka.consumer.max-poll-records}") +// private String maxPollRecords; +// +// /** +// * 自动提交偏移量 +// */ +// @Value("${spring.kafka.consumer.auto-offset-reset}") +// private String autoOffsetReset; +// +// /** +// * 监听器并发度 +// */ +// @Value("${spring.kafka.listener.concurrency}") +// private Integer concurrency; +// +// /** +// * 监听器是否忽略不存在的topic +// */ +// @Value("${spring.kafka.listener.missing-topics-fatal}") +// private boolean missingTopicsFatal; +// +// /** +// * 监听器拉取超时时间 +// */ +// @Value("${spring.kafka.listener.poll-timeout}") +// private long pollTimeout; +// +// /** +// * 配置消费者 +// * @return props +// */ +// @Bean +// public Map consumerConfigs() { +// Map props = new HashMap<>(); +// +// // 服务器地址 +// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// +// // 消费者组 +// props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// +// // 是否自动提交 +// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); +// +// // 自动提交时间偏移量 +// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); +// +// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: +// //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 +// //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) +// //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 +// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); +// +// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance +// props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); +// +// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 +// //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, +// //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, +// //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 +// //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 +// //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 +// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords); +// +// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s +// props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); +// +// //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) +// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); +// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); +// +// return props; +// } +// +// @Bean +// public ConsumerFactory consumerFactory(){ +// //配置消费者的反序列化 +// return new DefaultKafkaConsumerFactory<>(consumerConfigs(), +// new org.springframework.kafka.support.serializer.JsonDeserializer<>(), +// new org.springframework.kafka.support.serializer.JsonDeserializer<>().trustedPackages("*")); +// } +// +// +// +// @Bean +// public KafkaListenerContainerFactory> kafkaListenerContainerFactory(){ +// ConcurrentKafkaListenerContainerFactory factory = +// new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 +// factory.setConcurrency(concurrency); +// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 +// factory.setMissingTopicsFatal(missingTopicsFatal); +// //自动提交关闭,需要设置手动消息确认 +// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); +// factory.getContainerProperties().setPollTimeout(pollTimeout); +// //设置为批量监听,需要用List接收 +// //factory.setBatchListener(true); +// return factory; +// } +// +// +//} diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaProviderConfig.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaProviderConfig.java new file mode 100644 index 0000000..bbbbead --- /dev/null +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/KafkaProviderConfig.java @@ -0,0 +1,144 @@ +//package com.zhilian.online.config; +// +//import com.fasterxml.jackson.databind.annotation.JsonSerialize; +//import com.zhilian.common.core.domain.Result; +//import com.zhilian.common.core.utils.SpringUtils; +//import com.zhilian.common.core.utils.StringUtils; +//import com.zhilian.online.service.OnlineGatherService; +//import com.zhilian.online.service.OnlineLoadCenterService; +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.junit.jupiter.api.Test; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +//import org.springframework.kafka.transaction.KafkaTransactionManager; +// +//import java.util.HashMap; +//import java.util.Map; +//import java.util.Objects; +// +///** +// * @BelongsProject: smart-cloud-server +// * @BelongsPackage: com.zhilian.online.config +// * @Author: LiYuan +// * @CreateTime: 2024-04-02 19:36 +// * @Description: kafka生产者配置 +// * @Version: 1.0 +// */ +//@Configuration +//public class KafkaProviderConfig { +// +// /** +// * kafka服务器地址 +// */ +// @Value("${spring.kafka.producer.bootstrap-servers}") +// private String bootstrapServers; +// +// /** +// * kafka开启事务 +// */ +// @Value("${spring.kafka.producer.transaction-id-prefix}") +// private String transactionIdPrefix; +// +// /** +// * 手动提交确认信息 +// */ +// @Value("${spring.kafka.producer.acks}") +// private String acks; +// +// /** +// * 发生错误后消息重发次数 +// */ +// @Value("${spring.kafka.producer.retries}") +// private String retries; +// +// /** +// * 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。 +// * 该参数指定了一个批次可以使用的内存大小,按照字节数计算 +// */ +// @Value("${spring.kafka.producer.batch-size}") +// private String batchSize; +// +// /** +// * 生产者内存缓冲区大小 +// */ +// @Value("${spring.kafka.producer.buffer-memory}") +// private String bufferMemory; +// +// /** +// * 生产者最短发送消息时间 +// */ +// @Value("${spring.kafka.producer.linger-ms}") +// private String lingerMs; +// +// +// /** +// * 配置生产者配置对象 +// * @return props +// */ +// @Bean +// public Map producerConfigs() { +// Map props = new HashMap<>(); +// //设置生产者服务器 +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); +// //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 +// //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 +// //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 +// //开启事务必须设为all +// props.put(ProducerConfig.ACKS_CONFIG,acks); +// //设置发生错误后消息重发次数 +// props.put(ProducerConfig.RETRIES_CONFIG,retries); +// //当多个消息发送到相同分区时,生产者会将消息打包到一起发送,减少请求交互 +// //批次的大小可以通过batch.size 参数设置.默认是16KB +// //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 +// //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟 +// //实测batchSize这个参数没有用 +// props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize); +// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, +// //即使数据没达到16KB,也将这个批次发送出去 +// props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs); +// //设置生产者内存缓冲区大小 +// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory); +// //反序列化 +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerialize.class); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerialize.class); +// return props; +// } +// +// /** +// * 生产者开启事务配置 +// * @return factory +// */ +// @Bean +// public ProducerFactory producerFactory(){ +// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); +// //开启事务,会导致 LINGER_MS_CONFIG配置失效 +// factory.setTransactionIdPrefix(transactionIdPrefix); +// return factory; +// } +// +// /** +// * kafka事务管理器 +// * @param producerFactory +// * @return KafkaTransactionManager +// */ +// @Bean +// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory){ +// return new KafkaTransactionManager<>(producerFactory); +// } +// +// /** +// * 创建Kafka生产者实例 +// * @return KafkaTemplate +// */ +// @Bean +// public KafkaTemplate kafkaTemplate(){ +// return new KafkaTemplate<>(producerFactory()); +// } +// +// +// +//} diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java index d0d2028..4fb748f 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java @@ -1,12 +1,22 @@ package com.zhilian.online.config; +import com.zhilian.common.redis.service.RedisService; +import com.zhilian.online.uitls.MessageResolver; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * @BelongsProject: smart-cloud-server @@ -56,6 +66,54 @@ public class MqttxConfig { @Value("${mqtt.server.qos}") private Integer qos; + /** + * redis服务 + */ + @Autowired + private RedisService redisService; + /** + * kafka订阅topic + */ + private String topicName = "test-topic"; + + /** + * kafka服务 + */ + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrap_servers; + + /** + * 线程池 + */ + private ExecutorService executorService = Executors.newFixedThreadPool(3); + + /** + * kafka服务 + */ + private KafkaProducer kafkaProducer; + + /** + * @return + * @Description: 初始化kafka服务 + */ + @PostConstruct + public KafkaProducer initKafkaProducer() { + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + return new KafkaProducer<>(properties); + } + + /** + * @param msg + * @Description: 发送消息 + */ + private void sendMsg(String msg) { + kafkaProducer.send(new ProducerRecord<>(topicName, msg)); + log.info("Kafka在topic:{}中发送消息{}", topicName, msg); + } @PostConstruct public void initMqtt() { @@ -81,11 +139,40 @@ public class MqttxConfig { @Override public void connectionLost(Throwable throwable) { log.error("连接断开{}", throwable.getMessage()); + //链接断开后会尝试重新连接 + while (mqttClient.isConnected()) { + try { + //每次重连间隔60秒 + Thread.sleep(1000 * 60); + mqttClient.connect(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { log.info("消息到达,接受消息主题{},接受消息Qos{},接受消息内容{}", topic, mqttMessage.getQos(), new String(mqttMessage.getPayload())); + + //将接受到的车辆报文存储到kafka中 + executorService.execute(() -> { + //解析得出原始报文String + String sourceMsg = new String(mqttMessage.getPayload()); + //对原始报文进行解析 + String parseMsg = MessageResolver.parseMsg(sourceMsg); + + //截取原始报文获得车辆VIN码 + String vin = parseMsg.substring(0, 17); + + log.info("当前车辆VIN码:{}",vin); + + //调取接口,查询是否是我们的车 + + + + }); + } @Override @@ -94,7 +181,7 @@ public class MqttxConfig { } }); - mqttClient.subscribe(topic,qos); + mqttClient.subscribe(topic, qos); } catch (MqttException e) { throw new RuntimeException(e); diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java index 4b5ca67..1d26a43 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/RabbitConfig.java @@ -140,7 +140,6 @@ public class RabbitConfig { .to(delayExchange()) .with(DELAY_ROUTING_KEY); } - /** * 配置RabbitMq序列化器 * @param connectionFactory diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java index 4efc658..4cbde11 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/constans/OnlineConstants.java @@ -29,4 +29,6 @@ public class OnlineConstants { */ public static final String NODE_LOAD_PREFIX = "node_load"; + + } diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java new file mode 100644 index 0000000..d244ad7 --- /dev/null +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/consumer/KafkaConsumer.java @@ -0,0 +1,28 @@ +package com.zhilian.online.consumer; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.function.Consumer; + +/** + * @BelongsProject: smart-cloud-server + * @BelongsPackage: com.zhilian.online.consumer + * @Author: LiYuan + * @CreateTime: 2024-04-06 19:58 + * @Description: 卡夫卡消费者类 + * @Version: 1.0 + */ +@Component +@Slf4j +public class KafkaConsumer { + + @KafkaListener(topics = "test-topic") + public void handlerMsg(ConsumerRecord record){ + log.info("消费者消费消息信息为:{}", JSON.toJSONString(record)); + } + +} diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java index 24879eb..f0757fe 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/controller/OnlineLoadCenterController.java @@ -1,12 +1,10 @@ package com.zhilian.online.controller; -import com.alibaba.fastjson2.JSON; import com.zhilian.common.core.domain.Result; import com.zhilian.common.core.utils.ip.IpUtils; import com.zhilian.common.core.web.controller.BaseController; import com.zhilian.online.domain.Gather; -import com.zhilian.online.domain.req.GatherRegReq; import com.zhilian.online.service.OnlineLoadCenterService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java index 4872f89..c2ef6a2 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/service/impl/OnlineLoadCenterServiceImpl.java @@ -60,7 +60,7 @@ public class OnlineLoadCenterServiceImpl extends ServiceImpl kafkaTemplate; + + @Test + public void testSend(){ + kafkaTemplate.send("topic1","hello world!"); + } + + +} diff --git a/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhiLianResolverApplication.java b/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhiLianResolverApplication.java deleted file mode 100644 index de0dc3d..0000000 --- a/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhiLianResolverApplication.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.zhilian.resolver; -import com.zhilian.common.security.annotation.EnableMyFeignClients; -import com.zhilian.common.swagger.annotation.EnableCustomSwagger2; -import org.mybatis.spring.annotation.MapperScan; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.scheduling.annotation.EnableScheduling; - -@EnableCustomSwagger2 -@EnableScheduling -@EnableMyFeignClients -@MapperScan({"com.zhilian.resolver.mapper", "com.zhilian.resolver.resolverReport"}) -@SpringBootApplication -public class ZhiLianResolverApplication { - public static void main(String[] args) { - SpringApplication.run(ZhiLianResolverApplication.class, args); - } -} diff --git a/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhilianResolverApplication.java b/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhilianResolverApplication.java deleted file mode 100644 index 602ba1b..0000000 --- a/zhilian-modules/zhilian-resolver/src/main/java/com/zhilian/resolver/ZhilianResolverApplication.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.zhilian.resolver; -import com.zhilian.common.security.annotation.EnableMyFeignClients; -import com.zhilian.common.swagger.annotation.EnableCustomSwagger2; -import org.mybatis.spring.annotation.MapperScan; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.scheduling.annotation.EnableScheduling; - -@EnableCustomSwagger2 -@EnableScheduling -@EnableMyFeignClients -@MapperScan({"com.zhilian.resolver.mapper", "com.zhilian.resolver.resolverReport"}) -@SpringBootApplication -public class ZhilianResolverApplication { - public static void main(String[] args) { - SpringApplication.run(ZhilianResolverApplication.class, args); - } -} diff --git a/zhilian-modules/zhilian-resolver/src/main/resources/bootstrap.yml b/zhilian-modules/zhilian-resolver/src/main/resources/bootstrap.yml index 930724a..a4a540d 100644 --- a/zhilian-modules/zhilian-resolver/src/main/resources/bootstrap.yml +++ b/zhilian-modules/zhilian-resolver/src/main/resources/bootstrap.yml @@ -19,11 +19,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-modules/zhilian-system/src/main/resources/bootstrap.yml b/zhilian-modules/zhilian-system/src/main/resources/bootstrap.yml index cbc7840..2eeab13 100644 --- a/zhilian-modules/zhilian-system/src/main/resources/bootstrap.yml +++ b/zhilian-modules/zhilian-system/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置 diff --git a/zhilian-visual/zhilian-monitor/src/main/resources/bootstrap.yml b/zhilian-visual/zhilian-monitor/src/main/resources/bootstrap.yml index d8696f6..14ce9aa 100644 --- a/zhilian-visual/zhilian-monitor/src/main/resources/bootstrap.yml +++ b/zhilian-visual/zhilian-monitor/src/main/resources/bootstrap.yml @@ -15,11 +15,9 @@ spring: discovery: # 服务注册地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe config: # 配置中心地址 server-addr: 10.10.25.2:8848 - namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe # 配置文件格式 file-extension: yml # 共享配置