commit 3894f18c54fdd61972dd050c417b588dce7aa8bf Author: fst1996 <2411194573@qq.com> Date: Tue Nov 28 09:44:21 2023 +0800 初始化 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..aa00ffa --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..82dbec8 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..2b63946 --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4cd35e1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +# 基础镜像 +FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6 + +#暴露端口位置 +EXPOSE 9999 + +# 挂载目录 +VOLUME /home/logs/god-data-server + +# 复制jar文件到docker内部 +COPY /target/mqtt-redis-demo-1.0-SNAPSHOT.jar /home/app.jar + +#工作目录 exec -it 进来默认就是这个目 +WORKDIR /home + +# 启动java程序 +ENTRYPOINT ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"] diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ef51b55 --- /dev/null +++ b/pom.xml @@ -0,0 +1,76 @@ + + + 4.0.0 + + com.god + mqtt-kafkademo + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + jar + + + org.springframework.boot + spring-boot-starter-parent + 2.7.7 + + + + + menghang-public + 梦航-public + http://10.100.1.6:8081/repository/maven-public/ + + + + + + menghang-releases + 梦航-releases + http://10.100.1.6:8081/repository/maven-releases/ + + + + + + + + org.apache.kafka + kafka-clients + 3.3.1 + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + 2.8.11 + + + com.alibaba + fastjson + 1.2.83 + + + org.projectlombok + lombok + 1.18.28 + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + diff --git a/src/main/java/com/god/MqttRedisApplication.java b/src/main/java/com/god/MqttRedisApplication.java new file mode 100644 index 0000000..cde090e --- /dev/null +++ b/src/main/java/com/god/MqttRedisApplication.java @@ -0,0 +1,11 @@ +package com.god; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class MqttRedisApplication { + public static void main(String[] args) { + SpringApplication.run(MqttRedisApplication.class); + } +} diff --git a/src/main/java/com/god/kafka/config/KafkaProviderConfig.java b/src/main/java/com/god/kafka/config/KafkaProviderConfig.java new file mode 100644 index 0000000..1b10ee9 --- /dev/null +++ b/src/main/java/com/god/kafka/config/KafkaProviderConfig.java @@ -0,0 +1,81 @@ +package com.god.kafka.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * kafka 消息的提供者 配置类 + */ +@Data +@Builder +@Configuration +@NoArgsConstructor +@AllArgsConstructor +@ConfigurationProperties(prefix = "spring.kafka.producer") +public class KafkaProviderConfig { + + private String bootstrapServers; + + private String transactionIdPrefix; + + private String acks; + + private String retries; + + private String batchSize; + + private String bufferMemory; + + private String keySerializer; + + private String valueSerializer; + + /** + * 构建了 Map 存放了 Kafka 生产者的 配置信息 + * @return + */ + @Bean + public KafkaProducer producerConfigs() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 + //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 + //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 + //开启事务必须设为all + props.put(ProducerConfig.ACKS_CONFIG, acks); + //发生错误后,消息重发的次数,开启事务必须大于0 + props.put(ProducerConfig.RETRIES_CONFIG, retries); + //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 + //批次的大小可以通过batch.size 参数设置.默认是16KB + //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 + //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟 + //实测batchSize这个参数没有用 + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, + //即使数据没达到16KB,也将这个批次发送出去 + props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); + //生产者内存缓冲区的大小 + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + //反序列化,和生产者的序列化方式对应 + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + props.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner"); + return new KafkaProducer<>(props); + } + + +} diff --git a/src/main/java/com/god/kafka/contents/KafkaContent.java b/src/main/java/com/god/kafka/contents/KafkaContent.java new file mode 100644 index 0000000..44a0097 --- /dev/null +++ b/src/main/java/com/god/kafka/contents/KafkaContent.java @@ -0,0 +1,13 @@ +package com.god.kafka.contents; + +/** + * @author DongZl + * @description: kafka常量类 + * @Date 2023/8/25 18:47 + */ +public class KafkaContent { + + public static final String TOPIC = "top"; + + public static final String KAFKA_CON = "117.72.43.22:9092"; +} diff --git a/src/main/java/com/god/kafka/partitioner/MyPartitioner.java b/src/main/java/com/god/kafka/partitioner/MyPartitioner.java new file mode 100644 index 0000000..6e98936 --- /dev/null +++ b/src/main/java/com/god/kafka/partitioner/MyPartitioner.java @@ -0,0 +1,43 @@ +package com.god.kafka.partitioner; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +/** + * @author:fst + * @date:2023/11/25 + * @aim:自定义分区器 + */ +@Component +public class MyPartitioner implements Partitioner { + /** + * 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区) + * @param topic 消息队列名 + * @param key 用户传入key + * @param keyBytes key字节数组 + * @param value 用户传入value + * @param valueBytes value字节数组 + * @param cluster 当前kafka节点数 + * @return 如果3个分区,返回 0 1 2 + */ + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + //获取topic的partitions信息 + List partitionInfos = cluster.partitionsForTopic(topic); + int partitionsNum = partitionInfos.size(); + // 这里以 key 的哈希值作为分区选择依据 + System.out.println("================================"); + System.out.println(Math.abs(key.hashCode()) % partitionsNum); + return Math.abs(key.hashCode()) % partitionsNum; + } + + public void close() { + } + + public void configure(Map map) { + } +} diff --git a/src/main/java/com/god/kafka/producer/ProducerTest.java b/src/main/java/com/god/kafka/producer/ProducerTest.java new file mode 100644 index 0000000..bcefb81 --- /dev/null +++ b/src/main/java/com/god/kafka/producer/ProducerTest.java @@ -0,0 +1,69 @@ +package com.god.kafka.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static com.god.kafka.contents.KafkaContent.KAFKA_CON; +import static com.god.kafka.contents.KafkaContent.TOPIC; + + +/** + * @author DongZl + * @description: 生产者测试 + * @Date 2023/8/25 18:50 + */ +public class ProducerTest { + private static Producer producer; + + public static void KfkProducer() { + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_CON); +// props.put("linger.ms", 1); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(props); + } + + private static void close() { + producer.close(); + } + + private static RecordMetadata send(String topic, String key, String value) { + Future result = producer.send(new ProducerRecord<>(topic, key, value)); + RecordMetadata meta = null; + try { + meta = result.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return meta; + } + + public static void main (String[] args) { + KfkProducer(); + + new Thread(() -> { + int i = 0; + do { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + send(TOPIC, UUID.randomUUID().toString(), format); + }while (i++ < 1000); + close(); + }).start(); + + } +} diff --git a/src/main/java/com/god/kafka/service/KafkaProducerService.java b/src/main/java/com/god/kafka/service/KafkaProducerService.java new file mode 100644 index 0000000..00c4221 --- /dev/null +++ b/src/main/java/com/god/kafka/service/KafkaProducerService.java @@ -0,0 +1,33 @@ +package com.god.kafka.service; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * @description: 卡芙卡生产者服务 + * @Author fst + * @date 2023/11/26 15:06 + */ +@Service +public class KafkaProducerService { + + @Autowired + private Producer producer; + + public RecordMetadata send(String topic, String key, String value) { + Future result = producer.send(new ProducerRecord<>(topic, key, value)); + RecordMetadata meta = null; + try { + meta = result.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return meta; + } +} diff --git a/src/main/java/com/god/mqtt/config/MqttConfig.java b/src/main/java/com/god/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..d959727 --- /dev/null +++ b/src/main/java/com/god/mqtt/config/MqttConfig.java @@ -0,0 +1,44 @@ +package com.god.mqtt.config; + +import com.god.mqtt.service.MqttService; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.UUID; + +/** + * @author DongZl + * @description: Mqtt配置 + * @Date 2023-11-24 下午 02:06 + */ +@Log4j2 +@Configuration +public class MqttConfig { + + @Bean + public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){ + try { + log.info("mqtt服务器初始化开始"); + long startTime = System.currentTimeMillis(); + MqttClient client = new MqttClient(mqttProper.getBroker(), + UUID.randomUUID().toString(), + new MemoryPersistence()); + MemoryPersistence memoryPersistence = new MemoryPersistence(); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime); + client.connect(options); + client.setCallback(mqttService); + client.subscribe(mqttProper.getTopic(), 0); + return client; + }catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/god/mqtt/config/MqttProper.java b/src/main/java/com/god/mqtt/config/MqttProper.java new file mode 100644 index 0000000..3b9bab3 --- /dev/null +++ b/src/main/java/com/god/mqtt/config/MqttProper.java @@ -0,0 +1,32 @@ +package com.god.mqtt.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author DongZl + * @description: mqtt配置 + * @Date 2023-11-24 下午 02:04 + */ +@Data +@Builder +@Configuration +@NoArgsConstructor +@AllArgsConstructor +@ConfigurationProperties(prefix = "mqtt.config") +public class MqttProper { + + /** + * 节点 + */ + private String broker; + + /** + * 主题 + */ + private String topic; +} diff --git a/src/main/java/com/god/mqtt/service/MqttService.java b/src/main/java/com/god/mqtt/service/MqttService.java new file mode 100644 index 0000000..8c0be5c --- /dev/null +++ b/src/main/java/com/god/mqtt/service/MqttService.java @@ -0,0 +1,60 @@ +package com.god.mqtt.service; + + +import com.god.kafka.contents.KafkaContent; +import com.god.kafka.service.KafkaProducerService; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * @author DongZl + * @description: Mqtt消费者 + * @Date 2023-11-24 下午 02:10 + */ +@Log4j2 +@Service +public class MqttService implements MqttCallback { + + @Autowired + private KafkaProducerService kafkaProducerService; + + + /** + * This method is called when the connection to the server is lost. + * + * @param cause the reason behind the loss of connection. + */ + @Override + public void connectionLost (Throwable cause) { + + } + + /** + * mqtt监听消息发送给kafka + * @param topic + * @param message + * @throws Exception + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String msg = new String(message.getPayload()); + log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg); + kafkaProducerService.send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), msg); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..40b7576 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,57 @@ +mqtt: + config: + broker: tcp://47.115.225.229:1883 + topic: test + +# Tomcat +server: + port: 9999 + +# Spring +spring: + application: + # 应用名称 + name: mqtt-kafka + profiles: + # 环境配置 + active: dev + # kafka + kafka: + producer: + # Kafka生产者服务器 + bootstrap-servers: 10.100.1.8:9092 + transaction-id-prefix: kafkaTx- + retries: 3 + + # acks=0 : 生产者成功写入消息不会等待任何来自服务器的相应 + # acks=1 : 只要集群的master收到消息,生产者就会收到一个来自服务器的响应。 + # acks=all : 只有当所有参与者的复制节点全部收到消息时,生产者才会收到一个来自服务器的响应 + # 开启事务时 必须设置为all + acks: all + # 当有多个消息需要被发送到同一分区时,生产者会把他们放在同一批次里。 + batch-size: 16384 + # 生产者内存缓冲区的大小 + buffer-memory: 1024000 + # 键的序列化方式 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + # 值的序列化方式 + value-serializer: org.apache.kafka.common.serialization.StringSerializer + properties: + # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + max: + poll: + interval: + ms: 600000 + # 当broker多久没有收到consumer的心跳请求后就出发reBalance,默认值是10s + session: + timeout: + ms: 10000 + listener: + # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数 + concurrency: 4 + # 自动提交关闭,需要设置手动消息确认 + ack-mode: manual_immediate + # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略报错 + missing-topics-fatal: false + +