parent
dc3bf42273
commit
c99d1e0e89
8
pom.xml
8
pom.xml
|
@ -37,6 +37,7 @@
|
|||
<transmittable-thread-local.version>2.14.3</transmittable-thread-local.version>
|
||||
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
|
||||
<mqttv3.version>1.2.5</mqttv3.version>
|
||||
<spring-kafka.version>2.8.0</spring-kafka.version>
|
||||
</properties>
|
||||
|
||||
<!-- 依赖声明 -->
|
||||
|
@ -250,6 +251,13 @@
|
|||
<version>${mqttv3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- kafka-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zhilian</groupId>
|
||||
<artifactId>zhilian-common-business</artifactId>
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
@ -36,7 +34,6 @@ spring:
|
|||
ds1:
|
||||
nacos:
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
dataId: sentinel-zhilian-gateway
|
||||
groupId: DEFAULT_GROUP
|
||||
data-type: json
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -92,6 +92,12 @@
|
|||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- kafka配置-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -3,8 +3,12 @@ package com.zhilian.online;
|
|||
import com.zhilian.common.security.annotation.EnableCustomConfig;
|
||||
import com.zhilian.common.security.annotation.EnableMyFeignClients;
|
||||
import com.zhilian.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
|
||||
/**
|
||||
* @version:
|
||||
|
@ -16,8 +20,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
@EnableCustomSwagger2
|
||||
@EnableMyFeignClients
|
||||
@SpringBootApplication
|
||||
public class ZhiLianOnlineApplication {
|
||||
public class ZhiLianOnlineApplication{
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ZhiLianOnlineApplication.class,args);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
package com.zhilian.online;
|
||||
|
||||
public class aa {
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
//package com.zhilian.online.config;
|
||||
//
|
||||
//import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
//import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
//import org.springframework.beans.factory.annotation.Value;
|
||||
//import org.springframework.context.annotation.Bean;
|
||||
//import org.springframework.context.annotation.Configuration;
|
||||
//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
//import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
||||
//import org.springframework.kafka.core.ConsumerFactory;
|
||||
//import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
//import org.springframework.kafka.listener.ContainerProperties;
|
||||
//
|
||||
//import java.util.HashMap;
|
||||
//import java.util.Map;
|
||||
//
|
||||
///**
|
||||
// * @BelongsProject: smart-cloud-server
|
||||
// * @BelongsPackage: com.zhilian.online.config
|
||||
// * @Author: LiYuan
|
||||
// * @CreateTime: 2024-04-02 20:01
|
||||
// * @Description: kafka消费者配置类
|
||||
// * @Version: 1.0
|
||||
// */
|
||||
//@Configuration
|
||||
//public class KafkaConsumerConfig {
|
||||
//
|
||||
// /**
|
||||
// * kafka服务器地址
|
||||
// */
|
||||
// @Value("${spring.kafka.consumer.bootstrap-servers}")
|
||||
// private String bootstrapServers;
|
||||
//
|
||||
// /**
|
||||
// * 消费者组
|
||||
// */
|
||||
// @Value("${spring.kafka.consumer.group-id}")
|
||||
// private String groupId;
|
||||
//
|
||||
// /**
|
||||
// * 是否自动提交
|
||||
// */
|
||||
// @Value("${spring.kafka.consumer.enable-auto-commit}")
|
||||
// private boolean enableAutoCommit;
|
||||
//
|
||||
// /**
|
||||
// * session超时时间
|
||||
// */
|
||||
// @Value("${spring.kafka.consumer.session.timeout.ms}")
|
||||
// private String sessionTimeout;
|
||||
//
|
||||
// /**
|
||||
// * 最大拉取时间
|
||||
// */
|
||||
// @Value("${spring.kafka.consumer.max.poll.interval.ms}")
|
||||
// private String maxPollIntervalTime;
|
||||
//
|
||||
// /**
|
||||
// * 每次拉取最大条数
|
||||
// */
|
||||
// @Value("${spring.kafka.consumer.max-poll-records}")
|
||||
// private String maxPollRecords;
|
||||
//
|
||||
// /**
|
||||
// * 自动提交偏移量
|
||||
// */
|
||||
// @Value("${spring.kafka.consumer.auto-offset-reset}")
|
||||
// private String autoOffsetReset;
|
||||
//
|
||||
// /**
|
||||
// * 监听器并发度
|
||||
// */
|
||||
// @Value("${spring.kafka.listener.concurrency}")
|
||||
// private Integer concurrency;
|
||||
//
|
||||
// /**
|
||||
// * 监听器是否忽略不存在的topic
|
||||
// */
|
||||
// @Value("${spring.kafka.listener.missing-topics-fatal}")
|
||||
// private boolean missingTopicsFatal;
|
||||
//
|
||||
// /**
|
||||
// * 监听器拉取超时时间
|
||||
// */
|
||||
// @Value("${spring.kafka.listener.poll-timeout}")
|
||||
// private long pollTimeout;
|
||||
//
|
||||
// /**
|
||||
// * 配置消费者
|
||||
// * @return props
|
||||
// */
|
||||
// @Bean
|
||||
// public Map<String, Object> consumerConfigs() {
|
||||
// Map<String, Object> props = new HashMap<>();
|
||||
//
|
||||
// // 服务器地址
|
||||
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
//
|
||||
// // 消费者组
|
||||
// props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||
//
|
||||
// // 是否自动提交
|
||||
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
|
||||
//
|
||||
// // 自动提交时间偏移量
|
||||
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
|
||||
//
|
||||
// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
|
||||
// //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
|
||||
// //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
|
||||
// //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
|
||||
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
|
||||
//
|
||||
// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
|
||||
// props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
|
||||
//
|
||||
// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
|
||||
// //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
|
||||
// //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
|
||||
// //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
|
||||
// //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
|
||||
// //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
|
||||
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
|
||||
//
|
||||
// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
|
||||
// props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
|
||||
//
|
||||
// //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
|
||||
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||
//
|
||||
// return props;
|
||||
// }
|
||||
//
|
||||
// @Bean
|
||||
// public ConsumerFactory<Object,Object> consumerFactory(){
|
||||
// //配置消费者的反序列化
|
||||
// return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
|
||||
// new org.springframework.kafka.support.serializer.JsonDeserializer<>(),
|
||||
// new org.springframework.kafka.support.serializer.JsonDeserializer<>().trustedPackages("*"));
|
||||
// }
|
||||
//
|
||||
//
|
||||
//
|
||||
// @Bean
|
||||
// public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object,Object>> kafkaListenerContainerFactory(){
|
||||
// ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
|
||||
// new ConcurrentKafkaListenerContainerFactory<>();
|
||||
// factory.setConsumerFactory(consumerFactory());
|
||||
// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数
|
||||
// factory.setConcurrency(concurrency);
|
||||
// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
|
||||
// factory.setMissingTopicsFatal(missingTopicsFatal);
|
||||
// //自动提交关闭,需要设置手动消息确认
|
||||
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
|
||||
// factory.getContainerProperties().setPollTimeout(pollTimeout);
|
||||
// //设置为批量监听,需要用List接收
|
||||
// //factory.setBatchListener(true);
|
||||
// return factory;
|
||||
// }
|
||||
//
|
||||
//
|
||||
//}
|
|
@ -0,0 +1,144 @@
|
|||
//package com.zhilian.online.config;
|
||||
//
|
||||
//import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
//import com.zhilian.common.core.domain.Result;
|
||||
//import com.zhilian.common.core.utils.SpringUtils;
|
||||
//import com.zhilian.common.core.utils.StringUtils;
|
||||
//import com.zhilian.online.service.OnlineGatherService;
|
||||
//import com.zhilian.online.service.OnlineLoadCenterService;
|
||||
//import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
//import org.junit.jupiter.api.Test;
|
||||
//import org.springframework.beans.factory.annotation.Value;
|
||||
//import org.springframework.context.annotation.Bean;
|
||||
//import org.springframework.context.annotation.Configuration;
|
||||
//import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
//import org.springframework.kafka.core.KafkaTemplate;
|
||||
//import org.springframework.kafka.core.ProducerFactory;
|
||||
//import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
//
|
||||
//import java.util.HashMap;
|
||||
//import java.util.Map;
|
||||
//import java.util.Objects;
|
||||
//
|
||||
///**
|
||||
// * @BelongsProject: smart-cloud-server
|
||||
// * @BelongsPackage: com.zhilian.online.config
|
||||
// * @Author: LiYuan
|
||||
// * @CreateTime: 2024-04-02 19:36
|
||||
// * @Description: kafka生产者配置
|
||||
// * @Version: 1.0
|
||||
// */
|
||||
//@Configuration
|
||||
//public class KafkaProviderConfig {
|
||||
//
|
||||
// /**
|
||||
// * kafka服务器地址
|
||||
// */
|
||||
// @Value("${spring.kafka.producer.bootstrap-servers}")
|
||||
// private String bootstrapServers;
|
||||
//
|
||||
// /**
|
||||
// * kafka开启事务
|
||||
// */
|
||||
// @Value("${spring.kafka.producer.transaction-id-prefix}")
|
||||
// private String transactionIdPrefix;
|
||||
//
|
||||
// /**
|
||||
// * 手动提交确认信息
|
||||
// */
|
||||
// @Value("${spring.kafka.producer.acks}")
|
||||
// private String acks;
|
||||
//
|
||||
// /**
|
||||
// * 发生错误后消息重发次数
|
||||
// */
|
||||
// @Value("${spring.kafka.producer.retries}")
|
||||
// private String retries;
|
||||
//
|
||||
// /**
|
||||
// * 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。
|
||||
// * 该参数指定了一个批次可以使用的内存大小,按照字节数计算
|
||||
// */
|
||||
// @Value("${spring.kafka.producer.batch-size}")
|
||||
// private String batchSize;
|
||||
//
|
||||
// /**
|
||||
// * 生产者内存缓冲区大小
|
||||
// */
|
||||
// @Value("${spring.kafka.producer.buffer-memory}")
|
||||
// private String bufferMemory;
|
||||
//
|
||||
// /**
|
||||
// * 生产者最短发送消息时间
|
||||
// */
|
||||
// @Value("${spring.kafka.producer.linger-ms}")
|
||||
// private String lingerMs;
|
||||
//
|
||||
//
|
||||
// /**
|
||||
// * 配置生产者配置对象
|
||||
// * @return props
|
||||
// */
|
||||
// @Bean
|
||||
// public Map<String, Object> producerConfigs() {
|
||||
// Map<String, Object> props = new HashMap<>();
|
||||
// //设置生产者服务器
|
||||
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
|
||||
// //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
|
||||
// //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
|
||||
// //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
|
||||
// //开启事务必须设为all
|
||||
// props.put(ProducerConfig.ACKS_CONFIG,acks);
|
||||
// //设置发生错误后消息重发次数
|
||||
// props.put(ProducerConfig.RETRIES_CONFIG,retries);
|
||||
// //当多个消息发送到相同分区时,生产者会将消息打包到一起发送,减少请求交互
|
||||
// //批次的大小可以通过batch.size 参数设置.默认是16KB
|
||||
// //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
|
||||
// //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
|
||||
// //实测batchSize这个参数没有用
|
||||
// props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
|
||||
// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
|
||||
// //即使数据没达到16KB,也将这个批次发送出去
|
||||
// props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
|
||||
// //设置生产者内存缓冲区大小
|
||||
// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
|
||||
// //反序列化
|
||||
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerialize.class);
|
||||
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerialize.class);
|
||||
// return props;
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 生产者开启事务配置
|
||||
// * @return factory
|
||||
// */
|
||||
// @Bean
|
||||
// public ProducerFactory<Object,Object> producerFactory(){
|
||||
// DefaultKafkaProducerFactory<Object,Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
|
||||
// //开启事务,会导致 LINGER_MS_CONFIG配置失效
|
||||
// factory.setTransactionIdPrefix(transactionIdPrefix);
|
||||
// return factory;
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * kafka事务管理器
|
||||
// * @param producerFactory
|
||||
// * @return KafkaTransactionManager
|
||||
// */
|
||||
// @Bean
|
||||
// public KafkaTransactionManager<Object,Object> kafkaTransactionManager(ProducerFactory<Object,Object> producerFactory){
|
||||
// return new KafkaTransactionManager<>(producerFactory);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 创建Kafka生产者实例
|
||||
// * @return KafkaTemplate
|
||||
// */
|
||||
// @Bean
|
||||
// public KafkaTemplate<Object,Object> kafkaTemplate(){
|
||||
// return new KafkaTemplate<>(producerFactory());
|
||||
// }
|
||||
//
|
||||
//
|
||||
//
|
||||
//}
|
|
@ -1,12 +1,22 @@
|
|||
package com.zhilian.online.config;
|
||||
|
||||
import com.zhilian.common.redis.service.RedisService;
|
||||
import com.zhilian.online.uitls.MessageResolver;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* @BelongsProject: smart-cloud-server
|
||||
|
@ -56,6 +66,54 @@ public class MqttxConfig {
|
|||
@Value("${mqtt.server.qos}")
|
||||
private Integer qos;
|
||||
|
||||
/**
|
||||
* redis服务
|
||||
*/
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
/**
|
||||
* kafka订阅topic
|
||||
*/
|
||||
private String topicName = "test-topic";
|
||||
|
||||
/**
|
||||
* kafka服务
|
||||
*/
|
||||
@Value("${spring.kafka.producer.bootstrap-servers}")
|
||||
private String bootstrap_servers;
|
||||
|
||||
/**
|
||||
* 线程池
|
||||
*/
|
||||
private ExecutorService executorService = Executors.newFixedThreadPool(3);
|
||||
|
||||
/**
|
||||
* kafka服务
|
||||
*/
|
||||
private KafkaProducer<String, String> kafkaProducer;
|
||||
|
||||
/**
|
||||
* @return
|
||||
* @Description: 初始化kafka服务
|
||||
*/
|
||||
@PostConstruct
|
||||
public KafkaProducer<Object, Object> initKafkaProducer() {
|
||||
Properties properties = new Properties();
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
|
||||
return new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param msg
|
||||
* @Description: 发送消息
|
||||
*/
|
||||
private void sendMsg(String msg) {
|
||||
kafkaProducer.send(new ProducerRecord<>(topicName, msg));
|
||||
log.info("Kafka在topic:{}中发送消息{}", topicName, msg);
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void initMqtt() {
|
||||
|
@ -81,11 +139,40 @@ public class MqttxConfig {
|
|||
@Override
|
||||
public void connectionLost(Throwable throwable) {
|
||||
log.error("连接断开{}", throwable.getMessage());
|
||||
//链接断开后会尝试重新连接
|
||||
while (mqttClient.isConnected()) {
|
||||
try {
|
||||
//每次重连间隔60秒
|
||||
Thread.sleep(1000 * 60);
|
||||
mqttClient.connect();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
||||
log.info("消息到达,接受消息主题{},接受消息Qos{},接受消息内容{}", topic, mqttMessage.getQos(), new String(mqttMessage.getPayload()));
|
||||
|
||||
//将接受到的车辆报文存储到kafka中
|
||||
executorService.execute(() -> {
|
||||
//解析得出原始报文String
|
||||
String sourceMsg = new String(mqttMessage.getPayload());
|
||||
//对原始报文进行解析
|
||||
String parseMsg = MessageResolver.parseMsg(sourceMsg);
|
||||
|
||||
//截取原始报文获得车辆VIN码
|
||||
String vin = parseMsg.substring(0, 17);
|
||||
|
||||
log.info("当前车辆VIN码:{}",vin);
|
||||
|
||||
//调取接口,查询是否是我们的车
|
||||
|
||||
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,7 +181,7 @@ public class MqttxConfig {
|
|||
}
|
||||
});
|
||||
|
||||
mqttClient.subscribe(topic,qos);
|
||||
mqttClient.subscribe(topic, qos);
|
||||
|
||||
} catch (MqttException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -140,7 +140,6 @@ public class RabbitConfig {
|
|||
.to(delayExchange())
|
||||
.with(DELAY_ROUTING_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置RabbitMq序列化器
|
||||
* @param connectionFactory
|
||||
|
|
|
@ -29,4 +29,6 @@ public class OnlineConstants {
|
|||
*/
|
||||
public static final String NODE_LOAD_PREFIX = "node_load";
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package com.zhilian.online.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* @BelongsProject: smart-cloud-server
|
||||
* @BelongsPackage: com.zhilian.online.consumer
|
||||
* @Author: LiYuan
|
||||
* @CreateTime: 2024-04-06 19:58
|
||||
* @Description: 卡夫卡消费者类
|
||||
* @Version: 1.0
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class KafkaConsumer {
|
||||
|
||||
@KafkaListener(topics = "test-topic")
|
||||
public void handlerMsg(ConsumerRecord<Object,Object> record){
|
||||
log.info("消费者消费消息信息为:{}", JSON.toJSONString(record));
|
||||
}
|
||||
|
||||
}
|
|
@ -1,12 +1,10 @@
|
|||
package com.zhilian.online.controller;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.zhilian.common.core.domain.Result;
|
||||
import com.zhilian.common.core.utils.ip.IpUtils;
|
||||
import com.zhilian.common.core.web.controller.BaseController;
|
||||
|
||||
import com.zhilian.online.domain.Gather;
|
||||
import com.zhilian.online.domain.req.GatherRegReq;
|
||||
import com.zhilian.online.service.OnlineLoadCenterService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
|
|
@ -60,7 +60,7 @@ public class OnlineLoadCenterServiceImpl extends ServiceImpl<OnlineLoadCenterMap
|
|||
String token = IdUtils.fastSimpleUUID();
|
||||
|
||||
//将令牌信息缓存到Redis中
|
||||
redisService.setCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX , token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.MINUTES);
|
||||
redisService.setCacheObject(OnlineConstants.ONLINE_TOKEN_PREFIX , token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.SECONDS);
|
||||
|
||||
//将账户信息返回客户端
|
||||
return Result.success(token);
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
package com.zhilian.online.uitls;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @BelongsProject: smart-cloud-server
|
||||
* @BelongsPackage: com.zhilian.online.uitls
|
||||
* @Author: LiYuan
|
||||
* @CreateTime: 2024-04-07 22:04
|
||||
* @Description: 报文解析器, 对接受到的报文进行初步解析
|
||||
* @Version: 1.0
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MessageResolver {
|
||||
|
||||
/*
|
||||
* @Author: LiYuan
|
||||
* @Date: 2024/4/2 14:58
|
||||
* @Description: 将16进制字符串转换为ASCII字符串
|
||||
* @Param: [hexString]
|
||||
* @Return: java.lang.String
|
||||
**/
|
||||
public static String hexToString(String hexString) {
|
||||
StringBuffer asciiString = new StringBuffer();
|
||||
|
||||
for (int i = 0; i < hexString.length(); i += 2) {
|
||||
String hex = hexString.substring(i, i + 2);
|
||||
int decimal = Integer.parseInt(hex, 16);
|
||||
asciiString.append((char) decimal);
|
||||
}
|
||||
return asciiString.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* @description:获取车辆VIN码
|
||||
* @author: LiYuan
|
||||
* @param: [msg]
|
||||
* @return: String
|
||||
**/
|
||||
public static String parseMsg(String msg) {
|
||||
//去头去尾
|
||||
String substring = msg.substring(2, msg.length() - 2);
|
||||
//log.info("去头去尾的报文:" + substring);
|
||||
|
||||
//去空格
|
||||
String hexStringWithoutSpaces = substring.replaceAll("\\s+", "");
|
||||
|
||||
//转换为ASCII字符串
|
||||
String asciiString = hexToString(hexStringWithoutSpaces);
|
||||
|
||||
//log.info("解析后报文:" + asciiString);
|
||||
|
||||
return asciiString;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package com.zhilian.online;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @BelongsProject: smart-cloud-server
|
||||
* @BelongsPackage: com.zhilian.online
|
||||
* @Author: LiYuan
|
||||
* @CreateTime: 2024-04-06 20:02
|
||||
* @Description: TODO
|
||||
* @Version: 1.0
|
||||
*/
|
||||
@Component
|
||||
public class TestKafka {
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate<Object,Object> kafkaTemplate;
|
||||
|
||||
@Test
|
||||
public void testSend(){
|
||||
kafkaTemplate.send("topic1","hello world!");
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
package com.zhilian.resolver;
|
||||
import com.zhilian.common.security.annotation.EnableMyFeignClients;
|
||||
import com.zhilian.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@EnableCustomSwagger2
|
||||
@EnableScheduling
|
||||
@EnableMyFeignClients
|
||||
@MapperScan({"com.zhilian.resolver.mapper", "com.zhilian.resolver.resolverReport"})
|
||||
@SpringBootApplication
|
||||
public class ZhiLianResolverApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ZhiLianResolverApplication.class, args);
|
||||
}
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
package com.zhilian.resolver;
|
||||
import com.zhilian.common.security.annotation.EnableMyFeignClients;
|
||||
import com.zhilian.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@EnableCustomSwagger2
|
||||
@EnableScheduling
|
||||
@EnableMyFeignClients
|
||||
@MapperScan({"com.zhilian.resolver.mapper", "com.zhilian.resolver.resolverReport"})
|
||||
@SpringBootApplication
|
||||
public class ZhilianResolverApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ZhilianResolverApplication.class, args);
|
||||
}
|
||||
}
|
|
@ -19,11 +19,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
|
@ -15,11 +15,9 @@ spring:
|
|||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.10.25.2:8848
|
||||
namespace: 9d9e22dc-ff70-42c5-adac-fa69e6d62dbe
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
|
|
Loading…
Reference in New Issue