From 9fd3eceaec721a74bb7e2bf74f3da065b4bfa942 Mon Sep 17 00:00:00 2001
From: Yunfei Du <278774021@qq.com>
Date: Fri, 31 May 2024 22:08:45 +0800
Subject: [PATCH] =?UTF-8?q?feat:=E8=BF=90=E8=90=A5=E5=B9=B3=E5=8F=B0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.gitignore | 38 ++++++
.idea/.gitignore | 8 ++
.idea/encodings.xml | 8 ++
.idea/inspectionProfiles/Project_Default.xml | 5 +
.idea/misc.xml | 14 +++
.idea/vcs.xml | 6 +
pom.xml | 111 ++++++++++++++++++
src/main/java/com/load/MqttApplication.java | 17 +++
src/main/java/com/load/SubscribeSample.java | 86 ++++++++++++++
.../java/com/load/config/RabbitmqConfig.java | 54 +++++++++
.../com/load/config/RestTemplateConfig.java | 33 ++++++
.../com/load/mqtt/MessageCallbackService.java | 42 +++++++
src/main/java/com/load/mqtt/MqttFactory.java | 58 +++++++++
.../java/com/load/mqtt/MqttProperties.java | 51 ++++++++
.../java/com/load/rebbitmq/MsgHandle.java | 33 ++++++
.../java/com/load/rebbitmq/RabbitConfig.java | 50 ++++++++
src/main/resources/application.yml | 10 ++
17 files changed, 624 insertions(+)
create mode 100644 .gitignore
create mode 100644 .idea/.gitignore
create mode 100644 .idea/encodings.xml
create mode 100644 .idea/inspectionProfiles/Project_Default.xml
create mode 100644 .idea/misc.xml
create mode 100644 .idea/vcs.xml
create mode 100644 pom.xml
create mode 100644 src/main/java/com/load/MqttApplication.java
create mode 100644 src/main/java/com/load/SubscribeSample.java
create mode 100644 src/main/java/com/load/config/RabbitmqConfig.java
create mode 100644 src/main/java/com/load/config/RestTemplateConfig.java
create mode 100644 src/main/java/com/load/mqtt/MessageCallbackService.java
create mode 100644 src/main/java/com/load/mqtt/MqttFactory.java
create mode 100644 src/main/java/com/load/mqtt/MqttProperties.java
create mode 100644 src/main/java/com/load/rebbitmq/MsgHandle.java
create mode 100644 src/main/java/com/load/rebbitmq/RabbitConfig.java
create mode 100644 src/main/resources/application.yml
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..5ff6309
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..35410ca
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/encodings.xml b/.idea/encodings.xml
new file mode 100644
index 0000000..63574ec
--- /dev/null
+++ b/.idea/encodings.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
new file mode 100644
index 0000000..8d66637
--- /dev/null
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..82dbec8
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..514e9d2
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,111 @@
+
+
+ 4.0.0
+
+ com.load
+ operation-platform
+ 1.0-SNAPSHOT
+
+
+ 17
+ 17
+ UTF-8
+ 2.6.13
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.32
+ provided
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.5
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
+ io.lettuce
+ lettuce-core
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring-boot.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 17
+ 17
+ UTF-8
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+ com.load.MqttApplication
+ true
+
+
+
+ repackage
+
+ repackage
+
+
+
+
+
+
+
diff --git a/src/main/java/com/load/MqttApplication.java b/src/main/java/com/load/MqttApplication.java
new file mode 100644
index 0000000..eed7026
--- /dev/null
+++ b/src/main/java/com/load/MqttApplication.java
@@ -0,0 +1,17 @@
+package com.load;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * @ClassName MqttApplication
+ * @Description 描述
+ * @Author YunFei.Du
+ * @Date 2024/5/30 9:19
+ */
+@SpringBootApplication
+public class MqttApplication {
+ public static void main(String[] args) {
+ SpringApplication.run ( MqttApplication.class ,args);
+ }
+}
diff --git a/src/main/java/com/load/SubscribeSample.java b/src/main/java/com/load/SubscribeSample.java
new file mode 100644
index 0000000..c0477b9
--- /dev/null
+++ b/src/main/java/com/load/SubscribeSample.java
@@ -0,0 +1,86 @@
+package com.load;
+
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+/**
+ * 订阅 MQTT 主题
+ */
+public class SubscribeSample {
+
+ /**
+ * 主程序入口。
+ * 创建一个MQTT客户端,并连接到指定的代理服务器,订阅特定的主题,并处理接收到的消息。
+ *
+ */
+ public static void main(String[] args) {
+ /**
+ * 代理地址
+ */
+ String broker = "tcp://47.92.127.83:1883";
+ /**
+ * 主题
+ */
+ String topic = "mqtt/test";
+ String username = "emqx";
+ String password = "public";
+ /**
+ * 客户端ID(随机)
+ */
+ String clientid = "protocol-parsing";
+ int qos = 0;
+
+ try {
+ // 创建MqttClient实例
+ MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
+
+ // 设置连接选项
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setUserName(username);
+ options.setPassword(password.toCharArray());
+ options.setConnectionTimeout(60);
+// 堵塞60S
+ options.setKeepAliveInterval(60);
+ // 设置MQTT消息回调处理
+ client.setCallback(new MqttCallback() {
+
+ /**
+ * 连接丢失时的处理逻辑。
+ *
+ * @param cause 引发连接丢失的异常对象
+ */
+ public void connectionLost(Throwable cause) {
+ System.out.println("connectionLost: " + cause.getMessage());
+ }
+
+ /**
+ * 当接收到消息时的处理逻辑。
+ *
+ * @param topic 接收到消息的主题
+ * @param message 接收到的消息对象
+ */
+ public void messageArrived(String topic, MqttMessage message) {
+ System.out.println("topic: " + topic);
+ System.out.println("Qos: " + message.getQos());
+ System.out.println("message content: " + new String(message.getPayload()));
+
+ }
+ /**
+ * 消息成功发送完成的处理逻辑。
+ *
+ * @param token 表示消息发送过程的令牌
+ */
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ System.out.println("deliveryComplete---------" + token.isComplete());
+ }
+
+ });
+ // 连接到MQTT代理
+ client.connect(options);
+ // 订阅主题
+ client.subscribe(topic, qos);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/load/config/RabbitmqConfig.java b/src/main/java/com/load/config/RabbitmqConfig.java
new file mode 100644
index 0000000..97e27f2
--- /dev/null
+++ b/src/main/java/com/load/config/RabbitmqConfig.java
@@ -0,0 +1,54 @@
+package com.load.config;
+
+import org.springframework.amqp.core.*;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * rabbitMq配置类
+ *
+ * @author YunFei·Du
+ * @ClassName: RabbitmqConfig
+ * @Description: rabbitMq配置类
+ * @CreateTime: 2024/5/27 16:56
+ */
+@Configuration
+public class RabbitmqConfig {
+ public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
+ public static final String QUEUE_INFORM_SMS = "disconnect_connect";
+ public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
+ public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
+ public static final String ROUTINGKEY_SMS="inform.#.sms.#";
+
+
+ @Bean(EXCHANGE_TOPICS_INFORM)
+ public Exchange EXCHANGE_TOPICS_INFORM(){
+ //durable(true) 持久化,mq重启之后交换机还在
+ return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
+ }
+
+ //声明QUEUE_INFORM_EMAIL队列
+ @Bean(QUEUE_INFORM_EMAIL)
+ public Queue QUEUE_INFORM_EMAIL(){
+ return new Queue(QUEUE_INFORM_EMAIL);
+ }
+ //声明QUEUE_INFORM_SMS队列
+ @Bean(QUEUE_INFORM_SMS)
+ public Queue QUEUE_INFORM_SMS(){
+ return new Queue(QUEUE_INFORM_SMS);
+ }
+
+ //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
+ @Bean
+ public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
+ @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
+ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
+ }
+ //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
+ @Bean
+ public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
+ @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
+ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
+ }
+}
diff --git a/src/main/java/com/load/config/RestTemplateConfig.java b/src/main/java/com/load/config/RestTemplateConfig.java
new file mode 100644
index 0000000..b06b8c0
--- /dev/null
+++ b/src/main/java/com/load/config/RestTemplateConfig.java
@@ -0,0 +1,33 @@
+package com.load.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.ClientHttpRequestFactory;
+import org.springframework.http.client.SimpleClientHttpRequestFactory;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * rest配置类
+ *
+ * @author YunFei·Du
+ * @ClassName: RestTemplateConfig
+ * @Description: rest配置类
+ * @CreateTime: 2024/5/27 10:01
+ */
+@Configuration
+public class RestTemplateConfig {
+ @Bean
+ public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
+ return new RestTemplate(factory);
+ }
+
+ @Bean
+ public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
+ SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
+ //超时设置
+ factory.setReadTimeout(5000);//ms
+ factory.setConnectTimeout(15000);//ms
+ return factory;
+ }
+}
+
diff --git a/src/main/java/com/load/mqtt/MessageCallbackService.java b/src/main/java/com/load/mqtt/MessageCallbackService.java
new file mode 100644
index 0000000..89ba4b1
--- /dev/null
+++ b/src/main/java/com/load/mqtt/MessageCallbackService.java
@@ -0,0 +1,42 @@
+package com.load.mqtt;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Service;
+
+/**
+ * 消息回调服务 (回执消息类)
+ * @author YunFei.Du
+ * @date 22:37 2024/5/30
+ */
+@Service
+public class MessageCallbackService implements MqttCallback {
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ System.out.println("connectionLost: " + cause.getMessage());
+ }
+
+ /**
+ * 接收消息
+ * @param topic
+ * @param message
+ */
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ System.out.println("topic: " + topic);
+ System.out.println("Qos: " + message.getQos());
+ System.out.println("message content: " + new String(message.getPayload()));
+ }
+
+//
+ /**
+ * 发送消息
+ * @param token
+ */
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ System.out.println("deliveryComplete---------" + token.isComplete());
+ }
+}
diff --git a/src/main/java/com/load/mqtt/MqttFactory.java b/src/main/java/com/load/mqtt/MqttFactory.java
new file mode 100644
index 0000000..d5851a4
--- /dev/null
+++ b/src/main/java/com/load/mqtt/MqttFactory.java
@@ -0,0 +1,58 @@
+package com.load.mqtt;
+
+import lombok.AllArgsConstructor;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.stereotype.Service;
+
+/**
+ * Mqtt工厂
+ * @author YunFei.Du
+ * @date 22:38 2024/5/30
+ */
+@Service
+@AllArgsConstructor
+public class MqttFactory {
+
+ private final MessageCallbackService messageCallbackService;
+ // 连接参数
+
+ /**
+ * 创建Mqtt 客户端
+ * @param mqttProperties
+ * @return
+ */
+ public MqttClient createClient(MqttProperties mqttProperties){
+ MqttClient mqttClient =null;
+ try {
+ mqttClient=new MqttClient ( mqttProperties.getBroker() , mqttProperties.getClientId() , new MemoryPersistence() );
+
+ MqttConnectOptions options = new MqttConnectOptions ( );
+
+ //都不为空
+ if (mqttProperties.isLogin()){
+ options.setUserName ( mqttProperties.getUsername() );
+ options.setPassword ( mqttProperties.getPassword().toCharArray() );
+ }
+ options.setConnectionTimeout(60);
+ options.setKeepAliveInterval(60);
+
+ mqttClient.connect ( options );
+ //监听
+ mqttClient.setCallback ( messageCallbackService );
+ mqttClient.subscribe ( mqttProperties.getTopic(),0 );
+ } catch (MqttException e) {
+ throw new RuntimeException ( e );
+ }
+
+
+// try {
+//
+// } catch (MqttException e) {
+// throw new RuntimeException ( e );
+// }
+ return mqttClient;
+ }
+}
diff --git a/src/main/java/com/load/mqtt/MqttProperties.java b/src/main/java/com/load/mqtt/MqttProperties.java
new file mode 100644
index 0000000..ee5376c
--- /dev/null
+++ b/src/main/java/com/load/mqtt/MqttProperties.java
@@ -0,0 +1,51 @@
+package com.load.mqtt;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * 配置中心
+ * @author YunFei.Du
+ * @date 8:53 2024/5/30
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+public class MqttProperties {
+
+ private String broker;
+ private String topic;
+ private String username;
+ private String password;
+ private String clientId;
+
+ /**
+ * 构建mqtt配置
+ * @param ip
+ * @param topic
+ * @return
+ */
+ public static MqttProperties configBuild(String ip, String topic){
+ return MqttProperties.builder()
+ .broker("tcp://"+ip+":1883")
+ .topic(topic)
+ .username("admin")
+ .password("public")
+ .clientId("protocol-parsing") //协议解析 定值 --> 配置
+ .build();
+ }
+
+ /**
+ * 判断是否可以登录
+ * @return
+ */
+ public boolean isLogin(){
+// commons-lang3
+ return StringUtils.isBlank ( username ) && !StringUtils.isBlank ( password );
+ }
+
+}
diff --git a/src/main/java/com/load/rebbitmq/MsgHandle.java b/src/main/java/com/load/rebbitmq/MsgHandle.java
new file mode 100644
index 0000000..05304bc
--- /dev/null
+++ b/src/main/java/com/load/rebbitmq/MsgHandle.java
@@ -0,0 +1,33 @@
+package com.load.rebbitmq;
+
+import com.load.mqtt.MqttFactory;
+import com.load.mqtt.MqttProperties;
+import lombok.extern.log4j.Log4j2;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * 信息处理器
+ * @author YunFei.Du
+ * @date 9:21 2024/5/30
+ */
+@Component
+@Log4j2
+public class MsgHandle {
+
+ @Autowired
+ private MqttFactory mqttFactory;
+ @RabbitListener(queues = {"create.topic"})
+ private void msg(String msg){
+ log.info ( "接收到消息:{}" , msg );
+ MqttProperties mqttProperties = MqttProperties.configBuild ( "47.92.127.83", "mqtt/test" );
+
+ log.info ( "接收信息初始化程序:{}" , mqttProperties);
+ MqttClient client = mqttFactory.createClient ( mqttProperties );
+ log.info ( "client创建成功:{}", client.getClientId () );
+ }
+
+
+}
diff --git a/src/main/java/com/load/rebbitmq/RabbitConfig.java b/src/main/java/com/load/rebbitmq/RabbitConfig.java
new file mode 100644
index 0000000..7816bb6
--- /dev/null
+++ b/src/main/java/com/load/rebbitmq/RabbitConfig.java
@@ -0,0 +1,50 @@
+package com.load.rebbitmq;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+
+/**
+ * rabbit 配置类
+ * @author YunFei.Du
+ * @date 8:39 2024/5/31
+ */
+@Configuration
+public class RabbitConfig {
+ /**
+ * 创建队列
+ * @return
+ */
+ @Bean
+ public Queue initQueue() {
+ return new Queue ("create.topic", true); // true-->队列持久
+ }
+
+ /**
+ * 创建direct交换机
+ * @return
+ */
+ @Bean
+ public DirectExchange direct(){
+ return new DirectExchange ("topic.direct");
+ }
+
+ /**
+ * 绑定队列和交换机
+ * @param direct
+ * @param initQueue
+ * @return
+ */
+ @Bean
+ public Binding binding1a(DirectExchange direct, Queue initQueue){
+ return BindingBuilder.bind(initQueue)
+ .to(direct)
+ .with("protocol-parsing");
+ }
+
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..8c44e69
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,10 @@
+server:
+ port: 83
+
+spring:
+ application:
+ # 协议解析
+ name: protocol-parsing
+ rabbitmq:
+ host: 111.229.102.61
+ port: 5672