From c2cca1c8958e37afb17fe103edb0df6e6df37747 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=87=AF?= <371894675@qq.com> Date: Tue, 5 Dec 2023 09:41:30 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=B8=AD=E9=97=B4=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 46 +++++++ Dockerfile | 18 +++ pom.xml | 123 ++++++++++++++++++ src/main/java/com/durant/MqttApplication.java | 18 +++ .../kafka/config/KafkaProducerConfig.java | 43 ++++++ .../durant/kafka/config/MyPartitioner.java | 43 ++++++ .../durant/kafka/constant/KafkaConstants.java | 32 +++++ .../durant/kafka/service/KafkaService.java | 49 +++++++ .../durant/mqtt/config/MqttClientConnect.java | 54 ++++++++ .../com/durant/mqtt/config/MqttConfig.java | 21 +++ .../durant/mqtt/rabbit/SubscriberRabbit.java | 85 ++++++++++++ .../com/durant/mqtt/service/MqttService.java | 52 ++++++++ .../java/com/durant/mqtt/utils/Md5Util.java | 43 ++++++ src/main/resources/application.yml | 29 +++++ 14 files changed, 656 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 pom.xml create mode 100644 src/main/java/com/durant/MqttApplication.java create mode 100644 src/main/java/com/durant/kafka/config/KafkaProducerConfig.java create mode 100644 src/main/java/com/durant/kafka/config/MyPartitioner.java create mode 100644 src/main/java/com/durant/kafka/constant/KafkaConstants.java create mode 100644 src/main/java/com/durant/kafka/service/KafkaService.java create mode 100644 src/main/java/com/durant/mqtt/config/MqttClientConnect.java create mode 100644 src/main/java/com/durant/mqtt/config/MqttConfig.java create mode 100644 src/main/java/com/durant/mqtt/rabbit/SubscriberRabbit.java create mode 100644 src/main/java/com/durant/mqtt/service/MqttService.java create mode 100644 src/main/java/com/durant/mqtt/utils/Md5Util.java create mode 100644 src/main/resources/application.yml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..09bdfea --- /dev/null +++ b/.gitignore @@ -0,0 +1,46 @@ +###################################################################### +# Build Tools + +.gradle +/build/ +!gradle/wrapper/gradle-wrapper.jar + +target/ +!.mvn/wrapper/maven-wrapper.jar + +###################################################################### +# IDE + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### JRebel ### +rebel.xml +### NetBeans ### +nbproject/private/ +build/* +nbbuild/ +dist/ +nbdist/ +.nb-gradle/ + +###################################################################### +# Others +*.log +*.xml.versionsBackup +*.swp + +!*/build/*.java +!*/build/*.html +!*/build/*.xml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2efe5ad --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +#起始镜像 +FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6 +#暴露端口号 +EXPOSE 8084 +#挂载目录的位置 +VOLUME /home/logs/mqttDemo +#构建复制外部文件到docker +COPY /target/mqttdemo.jar /home/app.jar +#工作目录 exec -it 进入容器内部后的默认的起始目录 +WORKDIR /home +ENV TIME_ZONE Asia/Shanghai +#指定东八区 +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +#启动java 程序 +ENTRYPOINT ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"] + + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ac9c34a --- /dev/null +++ b/pom.xml @@ -0,0 +1,123 @@ + + + 4.0.0 + + com.kafka + mqttdemo + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + org.springframework.boot + spring-boot-starter + 2.7.15 + + + org.springframework.boot + spring-boot-starter-web + 2.7.15 + + + org.springframework.integration + spring-integration-mqtt + 5.5.18 + + + org.projectlombok + lombok + 1.18.28 + + + + org.apache.kafka + kafka-clients + 3.3.1 + + + org.jetbrains + annotations + 13.0 + compile + + + jakarta.validation + jakarta.validation-api + 2.0.2 + + + org.springframework.boot + spring-boot-starter-amqp + 2.7.15 + + + + com.baomidou + mybatis-plus-boot-starter + 3.5.3.1 + + + com.alibaba + fastjson + 2.0.32 + + + mysql + mysql-connector-java + 8.0.29 + + + + + + dragon-release + dragon-releases + http://10.100.1.7:8081/repository/maven-releases/ + + + + + dragon-public + dragon-maven + http://10.100.1.7:8081/repository/maven-public/ + + + public + aliyun nexus + http://10.100.1.7:8081/repository/maven-releases/ + + true + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + org.apache.maven.pluginsmaven-compiler-plugin1616 + + + diff --git a/src/main/java/com/durant/MqttApplication.java b/src/main/java/com/durant/MqttApplication.java new file mode 100644 index 0000000..847fef9 --- /dev/null +++ b/src/main/java/com/durant/MqttApplication.java @@ -0,0 +1,18 @@ +package com.durant; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author 冯凯 + * @version 1.0 + * @description: 启动类 + * @date 2023/11/24 14:23 + */ +@SpringBootApplication +public class MqttApplication { + + public static void main(String[] args) { + SpringApplication.run(MqttApplication.class,args); + } +} diff --git a/src/main/java/com/durant/kafka/config/KafkaProducerConfig.java b/src/main/java/com/durant/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..bbf730d --- /dev/null +++ b/src/main/java/com/durant/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,43 @@ +package com.durant.kafka.config; + + +import com.durant.kafka.constant.KafkaConstants; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; + +import static com.durant.kafka.constant.KafkaConstants.BOOSTRAP_SERVERS; + +/** + * @author 冯凯 + * @version 1.0 + * @description: kafka生产者配置 + * @date 2023/11/25 13:23 + */ +@Configuration +@Log4j2 +public class KafkaProducerConfig { + + @Bean + public Producer producerInit() { + + // 创建一个KafkaProducer的配置对象 + Properties properties = new Properties(); + // 设置Kafka服务器的地址 + properties.put("bootstrap.servers", "117.72.43.22:9092"); + // 设置消息的key和value的序列化方式为StringSerializer + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("partitioner.class", "com.durant.kafka.config.MyPartitioner"); + // 输出日志信息 + log.info("你好kafka"); + // 返回一个使用给定配置对象创建的KafkaProducer实例 + return new KafkaProducer<>(properties); + } + + +} diff --git a/src/main/java/com/durant/kafka/config/MyPartitioner.java b/src/main/java/com/durant/kafka/config/MyPartitioner.java new file mode 100644 index 0000000..e203585 --- /dev/null +++ b/src/main/java/com/durant/kafka/config/MyPartitioner.java @@ -0,0 +1,43 @@ +package com.durant.kafka.config; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +/** + * @author:fst + * @date:2023/11/25 + * @aim:自定义分区器 + */ +@Component +public class MyPartitioner implements Partitioner { + /** + * 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区) + * @param topic 消息队列名 + * @param key 用户传入key + * @param keyBytes key字节数组 + * @param value 用户传入value + * @param valueBytes value字节数组 + * @param cluster 当前kafka节点数 + * @return 如果3个分区,返回 0 1 2 + */ + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + //获取topic的partitions信息 + List partitionInfos = cluster.partitionsForTopic(topic); + int partitionsNum = partitionInfos.size(); + // 这里以 key 的哈希值作为分区选择依据 + System.out.println("================================"); + System.out.println(Math.abs(key.hashCode()) % partitionsNum); + return Math.abs(key.hashCode()) % partitionsNum; + } + + public void close() { + } + + public void configure(Map map) { + } +} diff --git a/src/main/java/com/durant/kafka/constant/KafkaConstants.java b/src/main/java/com/durant/kafka/constant/KafkaConstants.java new file mode 100644 index 0000000..2d5cfbb --- /dev/null +++ b/src/main/java/com/durant/kafka/constant/KafkaConstants.java @@ -0,0 +1,32 @@ +package com.durant.kafka.constant; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/11/25 13:20 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class KafkaConstants { + + public String topic="top"; + + public static final String BOOSTRAP_SERVERS="117.72.43.22:9092"; + + public String partition; + + public static KafkaConstants getBuilder(String topic,String partition){ + return KafkaConstants.builder() + .topic(topic) + .partition(partition) + .build(); + } +} diff --git a/src/main/java/com/durant/kafka/service/KafkaService.java b/src/main/java/com/durant/kafka/service/KafkaService.java new file mode 100644 index 0000000..37a7f23 --- /dev/null +++ b/src/main/java/com/durant/kafka/service/KafkaService.java @@ -0,0 +1,49 @@ +package com.durant.kafka.service; + +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * @author 冯凯 + * @version 1.0 + * @description: kafka的service + * @date 2023/11/25 13:57 + */ +@Service +@Log4j2 +public class KafkaService { + + @Autowired + private Producer producer; + + + /** + * 发送消息到Kafka + * + * @param topic 消息的主题 + * @param key 消息的键 + * @param value 消息的值 + * @return 返回消息的元数据 + */ + public RecordMetadata send(String topic, String key, String value){ + log.info("kafka发送消息为:"+value); + Future result = producer.send(new ProducerRecord<>(topic,0, key, value)); + RecordMetadata recordMetadata=null; + try { + recordMetadata=result.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return recordMetadata; + } + + + +} diff --git a/src/main/java/com/durant/mqtt/config/MqttClientConnect.java b/src/main/java/com/durant/mqtt/config/MqttClientConnect.java new file mode 100644 index 0000000..0ae9c86 --- /dev/null +++ b/src/main/java/com/durant/mqtt/config/MqttClientConnect.java @@ -0,0 +1,54 @@ +package com.durant.mqtt.config; + +import com.durant.mqtt.service.MqttService; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.UUID; + +/** + * @author 冯凯 + * @version 1.0 + * @description: mqtt客户端 + * @date 2023/11/24 14:26 + */ +@Log4j2 +@Configuration +public class MqttClientConnect { + + @Autowired + private MqttService mqttService; +// @Bean +// public MqttClient connect(MqttConfig mqttConfig){ +// // 创建一个MqttClient对象并返回 +// MqttClient mqttClient; +// try { +// log.info("开始连接:"+System.currentTimeMillis()); +// // 连接MQTT代理服务器 +// mqttClient= new MqttClient(mqttConfig.getBroker(), UUID.randomUUID().toString(),new MemoryPersistence()); +// // 创建MqttConnectOptions对象 +// MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); +// mqttConnectOptions.setUserName("worker"); +// mqttConnectOptions.setPassword("worker".toCharArray()); +// mqttConnectOptions.setConnectionTimeout(60); +// mqttConnectOptions.setKeepAliveInterval(60); +// // 连接MQTT代理服务器 +// mqttClient.connect(); +// // 设置回调函数 +// mqttClient.setCallback(mqttService); +// // 订阅指定的MQTT主题 +// mqttClient.subscribe(mqttConfig.getTopic(),0); +// } catch (MqttException e) { +// throw new RuntimeException(e); +// } +// log.info("连接成功:"+System.currentTimeMillis()); +// return mqttClient; +// } + +} diff --git a/src/main/java/com/durant/mqtt/config/MqttConfig.java b/src/main/java/com/durant/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..0b697eb --- /dev/null +++ b/src/main/java/com/durant/mqtt/config/MqttConfig.java @@ -0,0 +1,21 @@ +package com.durant.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author 冯凯 + * @version 1.0 + * @description: mqtt配置类 + * @date 2023/11/24 14:24 + */ +@Data +@Configuration +@ConfigurationProperties(prefix = "mqtt.config") +public class MqttConfig { + + private String broker; + + private String topic; +} diff --git a/src/main/java/com/durant/mqtt/rabbit/SubscriberRabbit.java b/src/main/java/com/durant/mqtt/rabbit/SubscriberRabbit.java new file mode 100644 index 0000000..70ed426 --- /dev/null +++ b/src/main/java/com/durant/mqtt/rabbit/SubscriberRabbit.java @@ -0,0 +1,85 @@ +package com.durant.mqtt.rabbit; + +import com.alibaba.fastjson.JSONObject; +import com.durant.kafka.constant.KafkaConstants; +import com.durant.mqtt.config.MqttClientConnect; +import com.durant.mqtt.config.MqttConfig; +import com.durant.mqtt.service.MqttService; +import com.rabbitmq.client.Channel; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.UUID; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/11/29 15:48 + */ +@Component +@Log4j2 +public class SubscriberRabbit { + @Autowired + private MqttClientConnect mqttClientConnect; + @Autowired + private MqttConfig mqttConfig; + /** + *注入rabbit + */ + @Autowired + private RabbitTemplate rabbitTemplate; + @Autowired + private MqttService mqttService; + @RabbitListener(queuesToDeclare={@Queue(value = "sub_top")}) + public void consumerSubscribe(String mesg, Message message, Channel channel){ + log.info("收到需要订阅的主题是:"+mesg); + // 创建一个MqttClient对象并返回 + MqttClient mqttClient; + try { + log.info("开始连接:"+System.currentTimeMillis()); + // 连接MQTT代理服务器 + mqttClient= new MqttClient(mqttConfig.getBroker(), UUID.randomUUID().toString(),new MemoryPersistence()); + // 创建MqttConnectOptions对象 + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName("worker"); + mqttConnectOptions.setPassword("worker".toCharArray()); + mqttConnectOptions.setConnectionTimeout(60); + mqttConnectOptions.setKeepAliveInterval(60); + // 连接MQTT代理服务器 + mqttClient.connect(); + //通过消息队列告知kafka消费者准备好要拉取哪个主题下的哪个分区的消息 + KafkaConstants kafkaConstants = KafkaConstants.getBuilder("topA", "0"); + rabbitTemplate.convertAndSend("kafka_top", JSONObject.toJSONString(kafkaConstants),msg->{ + msg.getMessageProperties().setMessageId(UUID.randomUUID().toString().replaceAll("-","")); + return msg; + }); + // 设置回调函数 + mqttClient.setCallback(mqttService); + // 订阅指定的MQTT主题 + mqttClient.subscribe(mesg,0); + + // 订阅指定的MQTT主题 + log.info("准备订阅主题是:"+mesg); + try { + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } catch (MqttException e) { + throw new RuntimeException(e); + } + log.info("连接成功:"+System.currentTimeMillis()); + } + +} diff --git a/src/main/java/com/durant/mqtt/service/MqttService.java b/src/main/java/com/durant/mqtt/service/MqttService.java new file mode 100644 index 0000000..66d0cf7 --- /dev/null +++ b/src/main/java/com/durant/mqtt/service/MqttService.java @@ -0,0 +1,52 @@ +package com.durant.mqtt.service; + +import com.durant.kafka.constant.KafkaConstants; +import com.durant.kafka.service.KafkaService; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Properties; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/11/24 19:07 + */ +@Service +public class MqttService implements MqttCallback { + + @Autowired + private KafkaService kafkaService; + private static final Logger log= LoggerFactory.getLogger(MqttService.class); + @Override + public void connectionLost(Throwable throwable) { + + } + + /** + * 消息到达时触发的方法 + */ + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + log.info("主题是:" + topic); + String msg = new String(mqttMessage.getPayload()); + log.info("收到消息是:" + msg); + kafkaService.send("topA", String.valueOf(msg.hashCode()), msg); + } + + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + + +} diff --git a/src/main/java/com/durant/mqtt/utils/Md5Util.java b/src/main/java/com/durant/mqtt/utils/Md5Util.java new file mode 100644 index 0000000..80caa3b --- /dev/null +++ b/src/main/java/com/durant/mqtt/utils/Md5Util.java @@ -0,0 +1,43 @@ +package com.durant.mqtt.utils; + +import java.math.BigInteger; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * @author 冯凯 + * @version 1.0 + * @description: + * @date 2023/11/27 14:46 + */ +public class Md5Util { + + public static String md5Encrypt(String message) { + try { + // 创建MD5消息摘要对象 + MessageDigest md = MessageDigest.getInstance("MD5"); + + // 更新消息摘要 + md.update(message.getBytes()); + + // 获取摘要结果 + byte[] digest = md.digest(); + + // 将字节数组转换为BigInteger类型 + BigInteger bigInt = new BigInteger(1, digest); + + // 将BigInteger类型转换为16进制字符串 + String encryptedMessage = bigInt.toString(16); + + // 补齐前导0直到长度为32 + while (encryptedMessage.length() < 32) { + encryptedMessage = "0" + encryptedMessage; + } + + return encryptedMessage; + } catch (NoSuchAlgorithmException e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..7cbdf47 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,29 @@ + +mqtt: + config: + broker: tcp://47.115.228.23:1883 + topic: test +server: + port: 8084 + +spring: + application: + name: mqttDemo + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://124.221.216.186:3306/mqtt?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8 + username: root + password: 27a9601cb3545824 + rabbitmq: + host: 182.254.222.21 + port: 5672 + template: + mandatory: true + listener: + simple: + prefetch: 1 # 每次取一条消息消费 消费完成取下一条 + acknowledge-mode: manual # 设置消费端手动ack确认 + retry: + enabled: true # 支持重试 + publisher-confirms: true #确认消息已发送到交换机(Exchange) + publisher-returns: true #确认消息已发送到队列(Queue)