commit 9fd3eceaec721a74bb7e2bf74f3da065b4bfa942 Author: Yunfei Du <278774021@qq.com> Date: Fri May 31 22:08:45 2024 +0800 feat:运营平台 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..63574ec --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..8d66637 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..82dbec8 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..514e9d2 --- /dev/null +++ b/pom.xml @@ -0,0 +1,111 @@ + + + 4.0.0 + + com.load + operation-platform + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + 2.6.13 + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.projectlombok + lombok + 1.18.32 + provided + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + + org.springframework.boot + spring-boot-starter-data-redis + + + io.lettuce + lettuce-core + + + + + + org.springframework.boot + spring-boot-starter-amqp + + + org.projectlombok + lombok + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 17 + 17 + UTF-8 + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + com.load.MqttApplication + true + + + + repackage + + repackage + + + + + + + diff --git a/src/main/java/com/load/MqttApplication.java b/src/main/java/com/load/MqttApplication.java new file mode 100644 index 0000000..eed7026 --- /dev/null +++ b/src/main/java/com/load/MqttApplication.java @@ -0,0 +1,17 @@ +package com.load; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @ClassName MqttApplication + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/5/30 9:19 + */ +@SpringBootApplication +public class MqttApplication { + public static void main(String[] args) { + SpringApplication.run ( MqttApplication.class ,args); + } +} diff --git a/src/main/java/com/load/SubscribeSample.java b/src/main/java/com/load/SubscribeSample.java new file mode 100644 index 0000000..c0477b9 --- /dev/null +++ b/src/main/java/com/load/SubscribeSample.java @@ -0,0 +1,86 @@ +package com.load; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +/** + * 订阅 MQTT 主题 + */ +public class SubscribeSample { + + /** + * 主程序入口。 + * 创建一个MQTT客户端,并连接到指定的代理服务器,订阅特定的主题,并处理接收到的消息。 + * + */ + public static void main(String[] args) { + /** + * 代理地址 + */ + String broker = "tcp://47.92.127.83:1883"; + /** + * 主题 + */ + String topic = "mqtt/test"; + String username = "emqx"; + String password = "public"; + /** + * 客户端ID(随机) + */ + String clientid = "protocol-parsing"; + int qos = 0; + + try { + // 创建MqttClient实例 + MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); + + // 设置连接选项 + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(60); +// 堵塞60S + options.setKeepAliveInterval(60); + // 设置MQTT消息回调处理 + client.setCallback(new MqttCallback() { + + /** + * 连接丢失时的处理逻辑。 + * + * @param cause 引发连接丢失的异常对象 + */ + public void connectionLost(Throwable cause) { + System.out.println("connectionLost: " + cause.getMessage()); + } + + /** + * 当接收到消息时的处理逻辑。 + * + * @param topic 接收到消息的主题 + * @param message 接收到的消息对象 + */ + public void messageArrived(String topic, MqttMessage message) { + System.out.println("topic: " + topic); + System.out.println("Qos: " + message.getQos()); + System.out.println("message content: " + new String(message.getPayload())); + + } + /** + * 消息成功发送完成的处理逻辑。 + * + * @param token 表示消息发送过程的令牌 + */ + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + + }); + // 连接到MQTT代理 + client.connect(options); + // 订阅主题 + client.subscribe(topic, qos); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/load/config/RabbitmqConfig.java b/src/main/java/com/load/config/RabbitmqConfig.java new file mode 100644 index 0000000..97e27f2 --- /dev/null +++ b/src/main/java/com/load/config/RabbitmqConfig.java @@ -0,0 +1,54 @@ +package com.load.config; + +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * rabbitMq配置类 + * + * @author YunFei·Du + * @ClassName: RabbitmqConfig + * @Description: rabbitMq配置类 + * @CreateTime: 2024/5/27 16:56 + */ +@Configuration +public class RabbitmqConfig { + public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; + public static final String QUEUE_INFORM_SMS = "disconnect_connect"; + public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; + public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; + public static final String ROUTINGKEY_SMS="inform.#.sms.#"; + + + @Bean(EXCHANGE_TOPICS_INFORM) + public Exchange EXCHANGE_TOPICS_INFORM(){ + //durable(true) 持久化,mq重启之后交换机还在 + return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); + } + + //声明QUEUE_INFORM_EMAIL队列 + @Bean(QUEUE_INFORM_EMAIL) + public Queue QUEUE_INFORM_EMAIL(){ + return new Queue(QUEUE_INFORM_EMAIL); + } + //声明QUEUE_INFORM_SMS队列 + @Bean(QUEUE_INFORM_SMS) + public Queue QUEUE_INFORM_SMS(){ + return new Queue(QUEUE_INFORM_SMS); + } + + //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey + @Bean + public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); + } + //ROUTINGKEY_SMS队列绑定交换机,指定routingKey + @Bean + public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); + } +} diff --git a/src/main/java/com/load/config/RestTemplateConfig.java b/src/main/java/com/load/config/RestTemplateConfig.java new file mode 100644 index 0000000..b06b8c0 --- /dev/null +++ b/src/main/java/com/load/config/RestTemplateConfig.java @@ -0,0 +1,33 @@ +package com.load.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.ClientHttpRequestFactory; +import org.springframework.http.client.SimpleClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +/** + * rest配置类 + * + * @author YunFei·Du + * @ClassName: RestTemplateConfig + * @Description: rest配置类 + * @CreateTime: 2024/5/27 10:01 + */ +@Configuration +public class RestTemplateConfig { + @Bean + public RestTemplate restTemplate(ClientHttpRequestFactory factory) { + return new RestTemplate(factory); + } + + @Bean + public ClientHttpRequestFactory simpleClientHttpRequestFactory() { + SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); + //超时设置 + factory.setReadTimeout(5000);//ms + factory.setConnectTimeout(15000);//ms + return factory; + } +} + diff --git a/src/main/java/com/load/mqtt/MessageCallbackService.java b/src/main/java/com/load/mqtt/MessageCallbackService.java new file mode 100644 index 0000000..89ba4b1 --- /dev/null +++ b/src/main/java/com/load/mqtt/MessageCallbackService.java @@ -0,0 +1,42 @@ +package com.load.mqtt; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +/** + * 消息回调服务 (回执消息类) + * @author YunFei.Du + * @date 22:37 2024/5/30 + */ +@Service +public class MessageCallbackService implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + System.out.println("connectionLost: " + cause.getMessage()); + } + + /** + * 接收消息 + * @param topic + * @param message + */ + @Override + public void messageArrived(String topic, MqttMessage message) { + System.out.println("topic: " + topic); + System.out.println("Qos: " + message.getQos()); + System.out.println("message content: " + new String(message.getPayload())); + } + +// + /** + * 发送消息 + * @param token + */ + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} diff --git a/src/main/java/com/load/mqtt/MqttFactory.java b/src/main/java/com/load/mqtt/MqttFactory.java new file mode 100644 index 0000000..d5851a4 --- /dev/null +++ b/src/main/java/com/load/mqtt/MqttFactory.java @@ -0,0 +1,58 @@ +package com.load.mqtt; + +import lombok.AllArgsConstructor; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +/** + * Mqtt工厂 + * @author YunFei.Du + * @date 22:38 2024/5/30 + */ +@Service +@AllArgsConstructor +public class MqttFactory { + + private final MessageCallbackService messageCallbackService; + // 连接参数 + + /** + * 创建Mqtt 客户端 + * @param mqttProperties + * @return + */ + public MqttClient createClient(MqttProperties mqttProperties){ + MqttClient mqttClient =null; + try { + mqttClient=new MqttClient ( mqttProperties.getBroker() , mqttProperties.getClientId() , new MemoryPersistence() ); + + MqttConnectOptions options = new MqttConnectOptions ( ); + + //都不为空 + if (mqttProperties.isLogin()){ + options.setUserName ( mqttProperties.getUsername() ); + options.setPassword ( mqttProperties.getPassword().toCharArray() ); + } + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + + mqttClient.connect ( options ); + //监听 + mqttClient.setCallback ( messageCallbackService ); + mqttClient.subscribe ( mqttProperties.getTopic(),0 ); + } catch (MqttException e) { + throw new RuntimeException ( e ); + } + + +// try { +// +// } catch (MqttException e) { +// throw new RuntimeException ( e ); +// } + return mqttClient; + } +} diff --git a/src/main/java/com/load/mqtt/MqttProperties.java b/src/main/java/com/load/mqtt/MqttProperties.java new file mode 100644 index 0000000..ee5376c --- /dev/null +++ b/src/main/java/com/load/mqtt/MqttProperties.java @@ -0,0 +1,51 @@ +package com.load.mqtt; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +/** + * 配置中心 + * @author YunFei.Du + * @date 8:53 2024/5/30 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class MqttProperties { + + private String broker; + private String topic; + private String username; + private String password; + private String clientId; + + /** + * 构建mqtt配置 + * @param ip + * @param topic + * @return + */ + public static MqttProperties configBuild(String ip, String topic){ + return MqttProperties.builder() + .broker("tcp://"+ip+":1883") + .topic(topic) + .username("admin") + .password("public") + .clientId("protocol-parsing") //协议解析 定值 --> 配置 + .build(); + } + + /** + * 判断是否可以登录 + * @return + */ + public boolean isLogin(){ +// commons-lang3 + return StringUtils.isBlank ( username ) && !StringUtils.isBlank ( password ); + } + +} diff --git a/src/main/java/com/load/rebbitmq/MsgHandle.java b/src/main/java/com/load/rebbitmq/MsgHandle.java new file mode 100644 index 0000000..05304bc --- /dev/null +++ b/src/main/java/com/load/rebbitmq/MsgHandle.java @@ -0,0 +1,33 @@ +package com.load.rebbitmq; + +import com.load.mqtt.MqttFactory; +import com.load.mqtt.MqttProperties; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 信息处理器 + * @author YunFei.Du + * @date 9:21 2024/5/30 + */ +@Component +@Log4j2 +public class MsgHandle { + + @Autowired + private MqttFactory mqttFactory; + @RabbitListener(queues = {"create.topic"}) + private void msg(String msg){ + log.info ( "接收到消息:{}" , msg ); + MqttProperties mqttProperties = MqttProperties.configBuild ( "47.92.127.83", "mqtt/test" ); + + log.info ( "接收信息初始化程序:{}" , mqttProperties); + MqttClient client = mqttFactory.createClient ( mqttProperties ); + log.info ( "client创建成功:{}", client.getClientId () ); + } + + +} diff --git a/src/main/java/com/load/rebbitmq/RabbitConfig.java b/src/main/java/com/load/rebbitmq/RabbitConfig.java new file mode 100644 index 0000000..7816bb6 --- /dev/null +++ b/src/main/java/com/load/rebbitmq/RabbitConfig.java @@ -0,0 +1,50 @@ +package com.load.rebbitmq; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + + +/** + * rabbit 配置类 + * @author YunFei.Du + * @date 8:39 2024/5/31 + */ +@Configuration +public class RabbitConfig { + /** + * 创建队列 + * @return + */ + @Bean + public Queue initQueue() { + return new Queue ("create.topic", true); // true-->队列持久 + } + + /** + * 创建direct交换机 + * @return + */ + @Bean + public DirectExchange direct(){ + return new DirectExchange ("topic.direct"); + } + + /** + * 绑定队列和交换机 + * @param direct + * @param initQueue + * @return + */ + @Bean + public Binding binding1a(DirectExchange direct, Queue initQueue){ + return BindingBuilder.bind(initQueue) + .to(direct) + .with("protocol-parsing"); + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..8c44e69 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: 83 + +spring: + application: + # 协议解析 + name: protocol-parsing + rabbitmq: + host: 111.229.102.61 + port: 5672