From 3202af6ebbc3e9d7012a44c05cc6871a8a8a08eb Mon Sep 17 00:00:00 2001
From: LQS <2506203757@qq.com>
Date: Mon, 7 Oct 2024 20:30:44 +0800
Subject: [PATCH] =?UTF-8?q?feat():=20=E5=AE=8C=E5=96=84mqtt=E7=9A=84?=
=?UTF-8?q?=E6=8A=95=E9=80=92?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
cloud-auth/src/main/resources/bootstrap.yml | 2 +-
cloud-common/cloud-common-kafka/pom.xml | 2 +-
cloud-common/cloud-common-mqtt/pom.xml | 31 +++
.../com/muyu/common/mqtt/MQTTConnect.java | 31 +++
cloud-common/pom.xml | 1 +
.../src/main/resources/bootstrap.yml | 2 +-
.../cloud-modules-enterprise-common/pom.xml | 4 +
.../cloud-modules-enterprise-server/pom.xml | 2 +-
.../com/muyu/enterprise/MQTT/ClientMQTT.java | 74 -------
.../enterprise/MQTT/MQTTReceiveCallback.java | 69 -------
.../muyu/enterprise/MQTT/MyMqttClient.java | 187 ------------------
.../muyu/enterprise/MQTT/PushCallback.java | 52 -----
.../com/muyu/enterprise/MQTT/ServerMQTT.java | 113 -----------
.../src/main/resources/bootstrap.yml | 2 +-
.../src/main/resources/bootstrap.yml | 2 +-
.../src/main/resources/bootstrap.yml | 2 +-
.../cloud-modules-protocol-analysis/pom.xml | 16 ++
.../analysis/parsing/MQTT/ParsingMQTT.java | 20 +-
.../com/muyu/analysis/parsing/MQTT/Test2.java | 86 ++++++++
.../muyu/analysis/parsing/util/CacheUtil.java | 0
.../src/main/resources/bootstrap.yml | 2 +-
.../src/main/resources/bootstrap.yml | 2 +-
.../src/main/resources/bootstrap.yml | 2 +-
pom.xml | 13 ++
24 files changed, 205 insertions(+), 512 deletions(-)
create mode 100644 cloud-common/cloud-common-mqtt/pom.xml
create mode 100644 cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java
delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java
delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java
delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java
delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java
delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java
create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java
rename JavaSample-tcp1061513671883/.lck => cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java (100%)
diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml
index f9be2d5..2bdda14 100644
--- a/cloud-auth/src/main/resources/bootstrap.yml
+++ b/cloud-auth/src/main/resources/bootstrap.yml
@@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
# Spring
spring:
application:
diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml
index 81d6907..60cf732 100644
--- a/cloud-common/cloud-common-kafka/pom.xml
+++ b/cloud-common/cloud-common-kafka/pom.xml
@@ -12,7 +12,7 @@
cloud-common-kafka
- cloud-common-kafka
+ cloud-common-kafka模块
diff --git a/cloud-common/cloud-common-mqtt/pom.xml b/cloud-common/cloud-common-mqtt/pom.xml
new file mode 100644
index 0000000..32a0a17
--- /dev/null
+++ b/cloud-common/cloud-common-mqtt/pom.xml
@@ -0,0 +1,31 @@
+
+
+ 4.0.0
+
+ com.muyu
+ cloud-common
+ 3.6.3
+
+
+ cloud-common-mqtt
+
+
+ cloud-common-mqtt消息队列遥测传输协议
+
+
+
+ 17
+ 17
+ UTF-8
+
+
+
+
+
+ com.muyu
+ cloud-common-core
+
+
+
diff --git a/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java b/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java
new file mode 100644
index 0000000..8534db5
--- /dev/null
+++ b/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java
@@ -0,0 +1,31 @@
+package com.muyu.common.mqtt;
+
+/**
+ * mqtt连接配置
+ * @Author:李庆帅
+ * @Package:com.muyu.common.mqtt
+ * @Project:cloud-server
+ * @name:MQTTConnect
+ * @Date:2024/10/2 9:40
+ */
+public class MQTTConnect
+{
+ /**
+ * String topic = "vehicle";
+ * String broker = "tcp://106.15.136.7:1883";
+ * String clientId = "JavaSample";
+ */
+
+ /**
+ * 定义主题字符串,用于MQTT消息交换的频道
+ */
+ public static final String TOPIC="vehicle";
+ /**
+ *定义代理服务器的连接字符串,格式通常为协议名称,IP地址和端口号
+ */
+ public static final String BROKER="tcp://106.15.136.7:1883";
+ /**
+ *定义客户端ID,用于在MQTT代理服务器中标识客户端
+ */
+ public static final String CLIENT_ID ="JavaSample";
+}
diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml
index 157bcd1..c273261 100644
--- a/cloud-common/pom.xml
+++ b/cloud-common/pom.xml
@@ -22,6 +22,7 @@
cloud-common-xxl
cloud-common-rabbit
cloud-common-kafka
+ cloud-common-mqtt
cloud-common
diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml
index 929b21f..63b6d7e 100644
--- a/cloud-gateway/src/main/resources/bootstrap.yml
+++ b/cloud-gateway/src/main/resources/bootstrap.yml
@@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml
index 0935b0a..077b1e9 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml
@@ -11,6 +11,10 @@
cloud-modules-enterprise-common
+
+ cloud-modules-enterprise-common企业业务平台服务
+
+
17
17
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml
index d3be51b..28ba282 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml
@@ -89,7 +89,7 @@
cloud-modules-enterprise-common
-
+
com.github.yulichang
mybatis-plus-join
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java
deleted file mode 100644
index 169edaf..0000000
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package com.muyu.enterprise.MQTT;
-
-import com.alibaba.nacos.api.remote.PushCallBack;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttTopic;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-/**
- * 模拟一个客户端接收消息
- * @Author:李庆帅
- * @Package:com.muyu.enterprise.MQTT
- * @Project:cloud-server
- * @name:ClientMQTT
- * @Date:2024/9/28 12:11
- */
-public class ClientMQTT {
- //String topic = "vehicle";
- // String content = "Message from MqttPublishSample";
- // int qos = 2;
- // String broker = "tcp://106.15.136.7:1883";
- // String clientId = "JavaSample";
- //MQTT代理服务器地址
- public static final String HOST="tcp://106.15.136.7:1883";
- public static final String TOPIC1="pos_message_all";
- private static final String clientId="12345678";
- private MqttClient client;
- private MqttConnectOptions options;
- private String userName="mqtt"; //非必须
- private String passWord="mqtt"; //非必须
-
- private void start(){
- try{
- // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
- client= new MqttClient(HOST,clientId,new MemoryPersistence());
- // MQTT的连接设置
- options=new MqttConnectOptions();
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
- options.setCleanSession(false);
- // 设置连接的用户名
- options.setUserName(userName);
- // 设置连接的密码
- options.setPassword(passWord.toCharArray());
- // 设置超时时间 单位为秒
- options.setConnectionTimeout(10);
- // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- options.setKeepAliveInterval(20);
- //设置断开后重新连接
- options.setAutomaticReconnect(true);
- // 设置回调
- client.setCallback(new PushCallback());
- MqttTopic topic = client.getTopic(TOPIC1);
- //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
- //遗嘱
- options.setWill(topic,"close".getBytes(),1,true);
- client.connect(options);
- //订阅消息
- int[] Qos = {1} ; //0:最多一次 、1:最少一次 、2:只有一次
- String[] topics1 = {TOPIC1};
- client.subscribe(topics1,Qos);
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static void main(String[] args){
- ClientMQTT clientMQTT = new ClientMQTT();
- clientMQTT.start();
- }
-
-
-
-}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java
deleted file mode 100644
index 5f18256..0000000
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.muyu.enterprise.MQTT;
-
-/**
- * @Author:李庆帅
- * @Package:com.muyu.enterprise.MQTT
- * @Project:cloud-server
- * @name:MQTTReceiveCallback
- * @Date:2024/9/27 22:21
- */
-
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-/**
- * 发布消息的回调类
- *
- * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
- * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
- * 在回调中,将它用来标识已经启动了该回调的哪个实例。
- * 必须在回调类中实现三个方法:
- *
- * (1):public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
- *
- * (2):public void connectionLost(Throwable cause)在断开连接时调用。
- *
- * (3):public void deliveryComplete(MqttDeliveryToken token))
- * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
- * 由 MqttClient.connect 激活此回调。
- *
- */
-public class MQTTReceiveCallback implements MqttCallback {
-// @Override
-// public void connectionLost(Throwable throwable) {
-// //连接丢失后,一般在这里面进行重连
-// System.out.println("连接断开,可以做重连");
-// }
-//
-// @Override
-// public void messageArrived(String topic, MqttMessage message) throws Exception {
-// //subscribe后得到的消息会执行到这里面
-// System.out.println("接收消息主题:"+topic);
-// System.out.println("接收消息Qos:"+message.getQos());
-// System.out.println("接收消息内容:"+new String(message.getPayload()));
-// }
-//
-// @Override
-// public void deliveryComplete(IMqttDeliveryToken token) {
-// System.out.println("deliveryComplete----------"+token.isComplete());
-// }
- @Override
- public void connectionLost(Throwable throwable){
- //连接丢失后,一般在这里面进行重连
- System.out.println("连接断开,可以做重连");
- }
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- //subscribe后得到的消息会执行到这面
- System.out.println("接收消息主题:"+topic);
- System.out.println("接收消息Qos:"+mqttMessage.getQos());
- System.out.println("接收消息内容:"+new String(mqttMessage.getPayload()));
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.out.println("deliveryComplete---------"+iMqttDeliveryToken.isComplete());
- }
-
-}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java
deleted file mode 100644
index fb9a4bc..0000000
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package com.muyu.enterprise.MQTT;
-
-
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-/**
- * 客户端类
- * @Author:李庆帅
- * @Package:com.muyu.enterprise.mqtt
- * @Project:cloud-server
- * @name:MyMqttClient
- * @Date:2024/9/27 22:20
- */
-public class MyMqttClient {
-
- public static MqttClient mqttClient =null;
- private static MemoryPersistence memoryPersistence=null;
- private static MqttConnectOptions mqttConectOptions=null;
- private static String ClinentName=""; //待填 将在服务端出现的名字
- private static String IP=""; //待填 服务器IP
-
-
- public static void main(String[] args) {
- start(ClinentName);
- }
-
-
-
- public static void start(String clientId){
- //初始化连接设置对象
- mqttConectOptions=new MqttConnectOptions();
- //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
- //这里设置为true表示每次连接到服务器都以新的身份连接
- mqttConectOptions.setCleanSession(true);
- //设置连接超时时间,单位是秒
- mqttConectOptions.setConnectionTimeout(10);
- //设置持久化方式
- memoryPersistence=new MemoryPersistence();
- if(null!=clientId){
- try{
- mqttClient =new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence);
- } catch (Exception e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }
- System.out.println("连接状态:"+mqttClient.isConnected());
- //设置连接和回调
- if(null!=mqttClient){
- if(!mqttClient.isConnected()){
- //创建回调函数对象
- MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback();
- //客户端添加回调函数
- mqttClient.setCallback(MQTTReceiveCallback);
- //创建连接
- try{
- System.out.println("创建连接");
- mqttClient.connect(mqttConectOptions);
- } catch (Exception e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }
- }else {
- System.out.println("mqttClient为空");
- }
- System.out.println("连接状态"+mqttClient.isConnected());
- }
- // 关闭连接
- public void closeConnect(){
- //关闭储存方式
- if(null!=memoryPersistence){
- try{
- memoryPersistence.close();
- } catch (Exception e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }else {
- System.out.println("memoryPersistence为空");
- }
-
- if(null!=mqttClient){
- if(mqttClient.isConnected()){
- try{
- mqttClient.disconnect();
- mqttClient.close();
- } catch (Exception e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }else {
- System.out.println("mqttClient未连接");
- }
- }else {
- System.out.println("mqttClient为空");
- }
- }
-
-
- //发布消息
- public static void publishMessage(String pubTopic,String message,int qos){
- if(null!=mqttClient && mqttClient.isConnected()){
- System.out.println("发布消息"+mqttClient.isConnected());
- System.out.println("id"+mqttClient.isConnected());
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setQos(qos);
-
- MqttTopic topic = mqttClient.getTopic(pubTopic);
-
- if(null!=topic){
- try{
- MqttDeliveryToken publish = topic.publish(mqttMessage);
- if(!publish.isComplete()){
- System.out.println("消息发布成功");
- }
- } catch (Exception e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }
-
- }
- }
-
-
- //重新连接
- public static void reConnect(){
- if(null!=mqttClient&&mqttClient.isConnected()){
- if(!mqttClient.isConnected()){
- if(null!=mqttConectOptions){
- try{
- mqttClient.connect(mqttConectOptions);
- } catch (Exception e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }else {
- System.out.println("mqttConnectOptions为空");
- }
- }else {
- System.out.println("mqttClient为空或已连接");
- }
- }else {
- start(ClinentName);
- }
- }
-
-
- //订阅主题
- public static void suvTopic(String topic){
- if(null!=mqttClient && mqttClient.isConnected()){
- try{
- mqttClient.subscribe(topic,1);
- } catch (MqttException e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }else {
- System.out.println("mqttClient出错");
- }
- }
-
-
- //清空主题
- public void cleanTopic(String topic){
- if(null!=mqttClient && mqttClient.isConnected()){
- try{
- mqttClient.subscribe(topic);
- } catch (Exception e) {
- // TODO 自动生成的捕获块
- throw new RuntimeException(e);
- }
- }else {
- System.out.println("mqttClient出错");
- }
- }
-
-
-
-
-
-
-
-
-}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java
deleted file mode 100644
index 769a3bc..0000000
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.muyu.enterprise.MQTT;
-
-/**
- * 发布消息的回调类
- * @Author:李庆帅
- * @Package:com.muyu.enterprise.MQTT
- * @Project:cloud-server
- * @name:PushCallback
- * @Date:2024/9/28 14:35
- */
-
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-/**
- * 发布消息的回调类
- *
- * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
- * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
- * 在回调中,将它用来标识已经启动了该回调的哪个实例。
- * 必须在回调类中实现三个方法:
- *
- * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
- *
- * public void connectionLost(Throwable cause)在断开连接时调用。
- *
- * public void deliveryComplete(MqttDeliveryToken token))
- * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
- * 由 MqttClient.connect 激活此回调。
- *
- */
-public class PushCallback implements MqttCallback {
- @Override
- public void connectionLost(Throwable throwable) {
- // 连接丢失后,一般在这里面进行重连
- System.out.println("连接断开,可以做重连");
- }
-
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-// subscribe后得到的消息会执行到这里面
- System.out.println("接收消息主题 : " + topic);
- System.out.println("接收消息Qos : " + mqttMessage.getQos());
- System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
- }
-}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java
deleted file mode 100644
index f026d7e..0000000
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package com.muyu.enterprise.MQTT;
-
-import lombok.extern.log4j.Log4j2;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-/**
- * 这是发送消息的服务端
- * 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
- * @Author:李庆帅
- * @Package:com.muyu.enterprise.MQTT
- * @Project:cloud-server
- * @name:ServerMQTT
- * @Date:2024/9/28 14:32
- */
-@Log4j2
-public class ServerMQTT {
-
- //tcp://MQTT安装的服务器地址:MQTT定义的端口号
- public static final String HOST = "tcp://127.0.0.1:1883";
- //定义一个主题
- public static final String TOPIC = "pos_message_all";
- //定义MQTT的ID,可以在MQTT服务配置中指定
- private static final String clientId = "server11";
-
- private MqttClient client;
- private static MqttTopic topic11;
-
- // private String userName = "mqtt"; //非必须
- // private String passWord = "mqtt"; //非必须
-
- private static MqttMessage message;
-
- /**
- * 构造方法
- * @throws MqttException
- */
- public ServerMQTT() throws MqttException {
- // MemoryPersistence设置clientid的保存形式,默认为以内存保存
- client=new MqttClient(HOST, clientId,new MemoryPersistence());
- connect();
- }
-
- /**
- * 用来连接服务器
- */
- private void connect()
- {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(false);
- // options.setUserName(userName);
- // options.setPassword(passWord.toCharArray());
- // 设置超时时间
- options.setConnectionTimeout(10);
- // 设置会话心跳时间
- options.setKeepAliveInterval(20);
- try{
- client.setCallback(new PushCallback());
- client.connect(options);
- topic11 = client.getTopic(TOPIC);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- /**
- *
- * @param topic
- * @param message
- * @throws MqttException
- */
- public static void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,MqttException{
- MqttDeliveryToken token= topic.publish(message);
-
- token.waitForCompletion();
- System.out.println("消息已完全发布!"+token.isComplete());
- log.info("消息已完全发布!"+token.isComplete());
-
- }
-
- /**
- *
- * @param clieId
- * @param msg
- * @throws Exception
- */
- public static void sendMessage(String clieId,String msg)throws Exception{
- ServerMQTT server = new ServerMQTT();
- server.message = new MqttMessage();
- server.message.setQos(1); //保证消息能到达一次
- server.message.setRetained(true);
- String str="{\"clienId\":\""+clieId+"\",\"msg\":\""+msg+"\"}";
- try{
- publish(server.topic11 , server.message);
- //断开连接
-// server.client.disconnect();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-
-
- public static void main(String[] args) throws Exception {
- sendMessage("123444","哈哈哈");
- }
-
-
-
-
-
-}
diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml
index 17007ea..9d05bf9 100644
--- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml
@@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
spring:
application:
diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml
index 45a2bda..eea6728 100644
--- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml
@@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml
index d820ac6..178e17b 100644
--- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml
@@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
# Spring
spring:
diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml
index c6ad48f..b095572 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml
+++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml
@@ -105,6 +105,22 @@
com.muyu
cloud-common-kafka
+
+
+ com.muyu
+ cloud-common-mqtt
+
+
+
+ org.springframework.boot
+ spring-boot-starter-cache
+
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 3.1.8
+
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
index dc0347a..6c9f58e 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java
@@ -1,9 +1,10 @@
-package com.muyu.analysis.parsing.MQTT;
+package com.muyu.analysis.parsing.mqtt;
import com.muyu.analysis.parsing.remote.RemoteClientService;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.constant.RedisConstants;
import com.muyu.common.core.domain.Result;
+import com.muyu.common.mqtt.MQTTConnect;
import com.muyu.enterprise.domain.resp.car.MessageValueListResp;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@@ -45,18 +46,18 @@ public class ParsingMQTT {
*/
@PostConstruct
public void mqttClient() {
- String topic = "vehicle";
- String broker = "tcp://106.15.136.7:1883";
- String clientId = "JavaSample";
+// String topic = "vehicle";
+//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883";
+//// String clientId = "JavaSample";
try {
// 第三个参数为空,默认持久化策略
- MqttClient sampleClient = new MqttClient(broker, clientId);
+ MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
- System.out.println("Connecting to broker: " + broker);
+ System.out.println("Connecting to MQTTConnect.BROKER: " + MQTTConnect.BROKER);
sampleClient.connect(connOpts);
- sampleClient.subscribe(topic, 0);
+ sampleClient.subscribe(MQTTConnect.TOPIC, 0);
sampleClient.setCallback(new MqttCallback() {
// 连接丢失
@Override
@@ -154,6 +155,11 @@ public class ParsingMQTT {
log.info("loc " + me.getLocalizedMessage());
log.info("cause " + me.getCause());
log.info("excep " + me);
+ System.out.println("reason " + me.getReasonCode());
+ System.out.println("msg " + me.getMessage());
+ System.out.println("loc " + me.getLocalizedMessage());
+ System.out.println("cause " + me.getCause());
+ System.out.println("excep " + me);
me.printStackTrace();
}
}
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java
new file mode 100644
index 0000000..9ed2037
--- /dev/null
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java
@@ -0,0 +1,86 @@
+package com.muyu.analysis.parsing.mqtt;
+
+import com.alibaba.fastjson2.JSONObject;
+import lombok.extern.log4j.Log4j2;
+import org.w3c.dom.stylesheets.LinkStyle;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Author:李庆帅
+ * @Package:com.muyu.analysis.parsing.mqtt
+ * @Project:cloud-server
+ * @name:Test2
+ * @Date:2024/10/6 20:36
+ */
+@Log4j2
+public class Test2
+{
+ private static final int DURATION_SECONDS = 5;
+ private static List receivedStrings = new ArrayList<>();
+ private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ private static int elapsedSeconds = 0;
+ private static String file = "elapsed" ;
+
+ public static void main(String[] args){
+ //定义一个任务,每秒执行一次
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ JSONObject stringFromSource = getStringFromSource();
+ receivedStrings.add(stringFromSource);
+ System.out.println("Received:"+stringFromSource);
+ //清理超过的数据
+ cleanUpOIdStrings();
+ //检查超速条件
+ checkForSpeeding();
+ }
+ };
+ //每个1秒执行一次任务
+ scheduler.scheduleAtFixedRate(task,0,1, TimeUnit.SECONDS);
+ }
+ //模拟从某个源获取字符串的方法
+ private static JSONObject getStringFromSource(){
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("message","Hello World");
+ jsonObject.put("time",System.currentTimeMillis());
+ jsonObject.put("elapsed",elapsedSeconds);
+ return jsonObject;
+ }
+
+ //清理超过60秒的数据
+ private static void cleanUpOIdStrings(){
+ long currentTime = System.currentTimeMillis();
+ receivedStrings.removeIf(jsonObject ->currentTime-jsonObject.getLong("time")>TimeUnit.SECONDS.toMicros(DURATION_SECONDS));
+ }
+
+ //检查是否有超速情况
+ private static void checkForSpeeding()
+ {
+ if(receivedStrings.size() < 2)return;//如果数据不足,直接返回
+
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("message","你好");
+ jsonObject.put("time",System.currentTimeMillis());
+ jsonObject.put("elapsed",10);
+
+ for (int i = 0; i < receivedStrings.size(); i++) {
+ JSONObject current = receivedStrings.get(i);
+ JSONObject next = receivedStrings.get(i + 1);
+
+ Short currentElapsed = current.getShort(file);
+ Short nextElapsed = next.getShort(file);
+ receivedStrings.add(jsonObject);
+ //检查条件,如果相差大于12,则记录错误
+ if (nextElapsed - currentElapsed > 12) {
+ System.out.println("出错啦!出错啦!车子超速啦!!!");
+ }
+ }
+ }
+
+
+}
diff --git a/JavaSample-tcp1061513671883/.lck b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java
similarity index 100%
rename from JavaSample-tcp1061513671883/.lck
rename to cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java
diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml
index e6feb7f..567d611 100644
--- a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml
@@ -7,7 +7,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
spring:
application:
diff --git a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml
index 4d1adbc..220bf4e 100644
--- a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml
+++ b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml
@@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
spring:
application:
diff --git a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml
index cbbe784..ddca326 100644
--- a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml
+++ b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml
@@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
- namespace: lqs
+ namespace: dev
# Spring
spring:
diff --git a/pom.xml b/pom.xml
index a49e303..e3c7e2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -303,6 +303,19 @@
cloud-modules-enterprise-common
${muyu.version}
+
+
+
+ com.muyu
+ cloud-modules-protocol-analysis
+ ${muyu.version}
+
+
+
+ com.muyu
+ cloud-common-mqtt
+ ${muyu.version}
+