From 3202af6ebbc3e9d7012a44c05cc6871a8a8a08eb Mon Sep 17 00:00:00 2001 From: LQS <2506203757@qq.com> Date: Mon, 7 Oct 2024 20:30:44 +0800 Subject: [PATCH] =?UTF-8?q?feat():=20=E5=AE=8C=E5=96=84mqtt=E7=9A=84?= =?UTF-8?q?=E6=8A=95=E9=80=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-auth/src/main/resources/bootstrap.yml | 2 +- cloud-common/cloud-common-kafka/pom.xml | 2 +- cloud-common/cloud-common-mqtt/pom.xml | 31 +++ .../com/muyu/common/mqtt/MQTTConnect.java | 31 +++ cloud-common/pom.xml | 1 + .../src/main/resources/bootstrap.yml | 2 +- .../cloud-modules-enterprise-common/pom.xml | 4 + .../cloud-modules-enterprise-server/pom.xml | 2 +- .../com/muyu/enterprise/MQTT/ClientMQTT.java | 74 ------- .../enterprise/MQTT/MQTTReceiveCallback.java | 69 ------- .../muyu/enterprise/MQTT/MyMqttClient.java | 187 ------------------ .../muyu/enterprise/MQTT/PushCallback.java | 52 ----- .../com/muyu/enterprise/MQTT/ServerMQTT.java | 113 ----------- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../cloud-modules-protocol-analysis/pom.xml | 16 ++ .../analysis/parsing/MQTT/ParsingMQTT.java | 20 +- .../com/muyu/analysis/parsing/MQTT/Test2.java | 86 ++++++++ .../muyu/analysis/parsing/util/CacheUtil.java | 0 .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- pom.xml | 13 ++ 24 files changed, 205 insertions(+), 512 deletions(-) create mode 100644 cloud-common/cloud-common-mqtt/pom.xml create mode 100644 cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java delete mode 100644 cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java create mode 100644 cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java rename JavaSample-tcp1061513671883/.lck => cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java (100%) diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml index f9be2d5..2bdda14 100644 --- a/cloud-auth/src/main/resources/bootstrap.yml +++ b/cloud-auth/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev # Spring spring: application: diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml index 81d6907..60cf732 100644 --- a/cloud-common/cloud-common-kafka/pom.xml +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -12,7 +12,7 @@ cloud-common-kafka - cloud-common-kafka + cloud-common-kafka模块 diff --git a/cloud-common/cloud-common-mqtt/pom.xml b/cloud-common/cloud-common-mqtt/pom.xml new file mode 100644 index 0000000..32a0a17 --- /dev/null +++ b/cloud-common/cloud-common-mqtt/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + + com.muyu + cloud-common + 3.6.3 + + + cloud-common-mqtt + + + cloud-common-mqtt消息队列遥测传输协议 + + + + 17 + 17 + UTF-8 + + + + + + com.muyu + cloud-common-core + + + diff --git a/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java b/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java new file mode 100644 index 0000000..8534db5 --- /dev/null +++ b/cloud-common/cloud-common-mqtt/src/main/java/com/muyu/common/mqtt/MQTTConnect.java @@ -0,0 +1,31 @@ +package com.muyu.common.mqtt; + +/** + * mqtt连接配置 + * @Author:李庆帅 + * @Package:com.muyu.common.mqtt + * @Project:cloud-server + * @name:MQTTConnect + * @Date:2024/10/2 9:40 + */ +public class MQTTConnect +{ + /** + * String topic = "vehicle"; + * String broker = "tcp://106.15.136.7:1883"; + * String clientId = "JavaSample"; + */ + + /** + * 定义主题字符串,用于MQTT消息交换的频道 + */ + public static final String TOPIC="vehicle"; + /** + *定义代理服务器的连接字符串,格式通常为协议名称,IP地址和端口号 + */ + public static final String BROKER="tcp://106.15.136.7:1883"; + /** + *定义客户端ID,用于在MQTT代理服务器中标识客户端 + */ + public static final String CLIENT_ID ="JavaSample"; +} diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml index 157bcd1..c273261 100644 --- a/cloud-common/pom.xml +++ b/cloud-common/pom.xml @@ -22,6 +22,7 @@ cloud-common-xxl cloud-common-rabbit cloud-common-kafka + cloud-common-mqtt cloud-common diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml index 929b21f..63b6d7e 100644 --- a/cloud-gateway/src/main/resources/bootstrap.yml +++ b/cloud-gateway/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev # Spring spring: diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml index 0935b0a..077b1e9 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-common/pom.xml @@ -11,6 +11,10 @@ cloud-modules-enterprise-common + + cloud-modules-enterprise-common企业业务平台服务 + + 17 17 diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml index d3be51b..28ba282 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml @@ -89,7 +89,7 @@ cloud-modules-enterprise-common - + com.github.yulichang mybatis-plus-join diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java deleted file mode 100644 index 169edaf..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ClientMQTT.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.muyu.enterprise.MQTT; - -import com.alibaba.nacos.api.remote.PushCallBack; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttTopic; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -/** - * 模拟一个客户端接收消息 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:ClientMQTT - * @Date:2024/9/28 12:11 - */ -public class ClientMQTT { - //String topic = "vehicle"; - // String content = "Message from MqttPublishSample"; - // int qos = 2; - // String broker = "tcp://106.15.136.7:1883"; - // String clientId = "JavaSample"; - //MQTT代理服务器地址 - public static final String HOST="tcp://106.15.136.7:1883"; - public static final String TOPIC1="pos_message_all"; - private static final String clientId="12345678"; - private MqttClient client; - private MqttConnectOptions options; - private String userName="mqtt"; //非必须 - private String passWord="mqtt"; //非必须 - - private void start(){ - try{ - // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 - client= new MqttClient(HOST,clientId,new MemoryPersistence()); - // MQTT的连接设置 - options=new MqttConnectOptions(); - // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 - options.setCleanSession(false); - // 设置连接的用户名 - options.setUserName(userName); - // 设置连接的密码 - options.setPassword(passWord.toCharArray()); - // 设置超时时间 单位为秒 - options.setConnectionTimeout(10); - // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 - options.setKeepAliveInterval(20); - //设置断开后重新连接 - options.setAutomaticReconnect(true); - // 设置回调 - client.setCallback(new PushCallback()); - MqttTopic topic = client.getTopic(TOPIC1); - //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 - //遗嘱 - options.setWill(topic,"close".getBytes(),1,true); - client.connect(options); - //订阅消息 - int[] Qos = {1} ; //0:最多一次 、1:最少一次 、2:只有一次 - String[] topics1 = {TOPIC1}; - client.subscribe(topics1,Qos); - - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static void main(String[] args){ - ClientMQTT clientMQTT = new ClientMQTT(); - clientMQTT.start(); - } - - - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java deleted file mode 100644 index 5f18256..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MQTTReceiveCallback.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.muyu.enterprise.MQTT; - -/** - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:MQTTReceiveCallback - * @Date:2024/9/27 22:21 - */ - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -/** - * 发布消息的回调类 - * - * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 - * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 - * 在回调中,将它用来标识已经启动了该回调的哪个实例。 - * 必须在回调类中实现三个方法: - * - * (1):public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 - * - * (2):public void connectionLost(Throwable cause)在断开连接时调用。 - * - * (3):public void deliveryComplete(MqttDeliveryToken token)) - * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 - * 由 MqttClient.connect 激活此回调。 - * - */ -public class MQTTReceiveCallback implements MqttCallback { -// @Override -// public void connectionLost(Throwable throwable) { -// //连接丢失后,一般在这里面进行重连 -// System.out.println("连接断开,可以做重连"); -// } -// -// @Override -// public void messageArrived(String topic, MqttMessage message) throws Exception { -// //subscribe后得到的消息会执行到这里面 -// System.out.println("接收消息主题:"+topic); -// System.out.println("接收消息Qos:"+message.getQos()); -// System.out.println("接收消息内容:"+new String(message.getPayload())); -// } -// -// @Override -// public void deliveryComplete(IMqttDeliveryToken token) { -// System.out.println("deliveryComplete----------"+token.isComplete()); -// } - @Override - public void connectionLost(Throwable throwable){ - //连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - //subscribe后得到的消息会执行到这面 - System.out.println("接收消息主题:"+topic); - System.out.println("接收消息Qos:"+mqttMessage.getQos()); - System.out.println("接收消息内容:"+new String(mqttMessage.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - System.out.println("deliveryComplete---------"+iMqttDeliveryToken.isComplete()); - } - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java deleted file mode 100644 index fb9a4bc..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/MyMqttClient.java +++ /dev/null @@ -1,187 +0,0 @@ -package com.muyu.enterprise.MQTT; - - -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -/** - * 客户端类 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.mqtt - * @Project:cloud-server - * @name:MyMqttClient - * @Date:2024/9/27 22:20 - */ -public class MyMqttClient { - - public static MqttClient mqttClient =null; - private static MemoryPersistence memoryPersistence=null; - private static MqttConnectOptions mqttConectOptions=null; - private static String ClinentName=""; //待填 将在服务端出现的名字 - private static String IP=""; //待填 服务器IP - - - public static void main(String[] args) { - start(ClinentName); - } - - - - public static void start(String clientId){ - //初始化连接设置对象 - mqttConectOptions=new MqttConnectOptions(); - //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, - //这里设置为true表示每次连接到服务器都以新的身份连接 - mqttConectOptions.setCleanSession(true); - //设置连接超时时间,单位是秒 - mqttConectOptions.setConnectionTimeout(10); - //设置持久化方式 - memoryPersistence=new MemoryPersistence(); - if(null!=clientId){ - try{ - mqttClient =new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - } - System.out.println("连接状态:"+mqttClient.isConnected()); - //设置连接和回调 - if(null!=mqttClient){ - if(!mqttClient.isConnected()){ - //创建回调函数对象 - MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback(); - //客户端添加回调函数 - mqttClient.setCallback(MQTTReceiveCallback); - //创建连接 - try{ - System.out.println("创建连接"); - mqttClient.connect(mqttConectOptions); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - } - }else { - System.out.println("mqttClient为空"); - } - System.out.println("连接状态"+mqttClient.isConnected()); - } - // 关闭连接 - public void closeConnect(){ - //关闭储存方式 - if(null!=memoryPersistence){ - try{ - memoryPersistence.close(); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("memoryPersistence为空"); - } - - if(null!=mqttClient){ - if(mqttClient.isConnected()){ - try{ - mqttClient.disconnect(); - mqttClient.close(); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttClient未连接"); - } - }else { - System.out.println("mqttClient为空"); - } - } - - - //发布消息 - public static void publishMessage(String pubTopic,String message,int qos){ - if(null!=mqttClient && mqttClient.isConnected()){ - System.out.println("发布消息"+mqttClient.isConnected()); - System.out.println("id"+mqttClient.isConnected()); - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(qos); - - MqttTopic topic = mqttClient.getTopic(pubTopic); - - if(null!=topic){ - try{ - MqttDeliveryToken publish = topic.publish(mqttMessage); - if(!publish.isComplete()){ - System.out.println("消息发布成功"); - } - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - } - - } - } - - - //重新连接 - public static void reConnect(){ - if(null!=mqttClient&&mqttClient.isConnected()){ - if(!mqttClient.isConnected()){ - if(null!=mqttConectOptions){ - try{ - mqttClient.connect(mqttConectOptions); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttConnectOptions为空"); - } - }else { - System.out.println("mqttClient为空或已连接"); - } - }else { - start(ClinentName); - } - } - - - //订阅主题 - public static void suvTopic(String topic){ - if(null!=mqttClient && mqttClient.isConnected()){ - try{ - mqttClient.subscribe(topic,1); - } catch (MqttException e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttClient出错"); - } - } - - - //清空主题 - public void cleanTopic(String topic){ - if(null!=mqttClient && mqttClient.isConnected()){ - try{ - mqttClient.subscribe(topic); - } catch (Exception e) { - // TODO 自动生成的捕获块 - throw new RuntimeException(e); - } - }else { - System.out.println("mqttClient出错"); - } - } - - - - - - - - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java deleted file mode 100644 index 769a3bc..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/PushCallback.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.muyu.enterprise.MQTT; - -/** - * 发布消息的回调类 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:PushCallback - * @Date:2024/9/28 14:35 - */ - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -/** - * 发布消息的回调类 - * - * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 - * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 - * 在回调中,将它用来标识已经启动了该回调的哪个实例。 - * 必须在回调类中实现三个方法: - * - * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 - * - * public void connectionLost(Throwable cause)在断开连接时调用。 - * - * public void deliveryComplete(MqttDeliveryToken token)) - * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 - * 由 MqttClient.connect 激活此回调。 - * - */ -public class PushCallback implements MqttCallback { - @Override - public void connectionLost(Throwable throwable) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { -// subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题 : " + topic); - System.out.println("接收消息Qos : " + mqttMessage.getQos()); - System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); - } -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java deleted file mode 100644 index f026d7e..0000000 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/MQTT/ServerMQTT.java +++ /dev/null @@ -1,113 +0,0 @@ -package com.muyu.enterprise.MQTT; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -/** - * 这是发送消息的服务端 - * 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题 - * @Author:李庆帅 - * @Package:com.muyu.enterprise.MQTT - * @Project:cloud-server - * @name:ServerMQTT - * @Date:2024/9/28 14:32 - */ -@Log4j2 -public class ServerMQTT { - - //tcp://MQTT安装的服务器地址:MQTT定义的端口号 - public static final String HOST = "tcp://127.0.0.1:1883"; - //定义一个主题 - public static final String TOPIC = "pos_message_all"; - //定义MQTT的ID,可以在MQTT服务配置中指定 - private static final String clientId = "server11"; - - private MqttClient client; - private static MqttTopic topic11; - - // private String userName = "mqtt"; //非必须 - // private String passWord = "mqtt"; //非必须 - - private static MqttMessage message; - - /** - * 构造方法 - * @throws MqttException - */ - public ServerMQTT() throws MqttException { - // MemoryPersistence设置clientid的保存形式,默认为以内存保存 - client=new MqttClient(HOST, clientId,new MemoryPersistence()); - connect(); - } - - /** - * 用来连接服务器 - */ - private void connect() - { - MqttConnectOptions options = new MqttConnectOptions(); - options.setCleanSession(false); - // options.setUserName(userName); - // options.setPassword(passWord.toCharArray()); - // 设置超时时间 - options.setConnectionTimeout(10); - // 设置会话心跳时间 - options.setKeepAliveInterval(20); - try{ - client.setCallback(new PushCallback()); - client.connect(options); - topic11 = client.getTopic(TOPIC); - - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /** - * - * @param topic - * @param message - * @throws MqttException - */ - public static void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,MqttException{ - MqttDeliveryToken token= topic.publish(message); - - token.waitForCompletion(); - System.out.println("消息已完全发布!"+token.isComplete()); - log.info("消息已完全发布!"+token.isComplete()); - - } - - /** - * - * @param clieId - * @param msg - * @throws Exception - */ - public static void sendMessage(String clieId,String msg)throws Exception{ - ServerMQTT server = new ServerMQTT(); - server.message = new MqttMessage(); - server.message.setQos(1); //保证消息能到达一次 - server.message.setRetained(true); - String str="{\"clienId\":\""+clieId+"\",\"msg\":\""+msg+"\"}"; - try{ - publish(server.topic11 , server.message); - //断开连接 -// server.client.disconnect(); - }catch (Exception e){ - e.printStackTrace(); - } - } - - - public static void main(String[] args) throws Exception { - sendMessage("123444","哈哈哈"); - } - - - - - -} diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml index 17007ea..9d05bf9 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev spring: application: diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml index 45a2bda..eea6728 100644 --- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev # Spring spring: diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml index d820ac6..178e17b 100644 --- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev # Spring spring: diff --git a/cloud-modules/cloud-modules-protocol-analysis/pom.xml b/cloud-modules/cloud-modules-protocol-analysis/pom.xml index c6ad48f..b095572 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/pom.xml +++ b/cloud-modules/cloud-modules-protocol-analysis/pom.xml @@ -105,6 +105,22 @@ com.muyu cloud-common-kafka + + + com.muyu + cloud-common-mqtt + + + + org.springframework.boot + spring-boot-starter-cache + + + + com.github.ben-manes.caffeine + caffeine + 3.1.8 + diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java index dc0347a..6c9f58e 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/ParsingMQTT.java @@ -1,9 +1,10 @@ -package com.muyu.analysis.parsing.MQTT; +package com.muyu.analysis.parsing.mqtt; import com.muyu.analysis.parsing.remote.RemoteClientService; import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.constant.RedisConstants; import com.muyu.common.core.domain.Result; +import com.muyu.common.mqtt.MQTTConnect; import com.muyu.enterprise.domain.resp.car.MessageValueListResp; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; @@ -45,18 +46,18 @@ public class ParsingMQTT { */ @PostConstruct public void mqttClient() { - String topic = "vehicle"; - String broker = "tcp://106.15.136.7:1883"; - String clientId = "JavaSample"; +// String topic = "vehicle"; +//// String MQTTConnect.BROKER = "tcp://106.15.136.7:1883"; +//// String clientId = "JavaSample"; try { // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(broker, clientId); + MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); - System.out.println("Connecting to broker: " + broker); + System.out.println("Connecting to MQTTConnect.BROKER: " + MQTTConnect.BROKER); sampleClient.connect(connOpts); - sampleClient.subscribe(topic, 0); + sampleClient.subscribe(MQTTConnect.TOPIC, 0); sampleClient.setCallback(new MqttCallback() { // 连接丢失 @Override @@ -154,6 +155,11 @@ public class ParsingMQTT { log.info("loc " + me.getLocalizedMessage()); log.info("cause " + me.getCause()); log.info("excep " + me); + System.out.println("reason " + me.getReasonCode()); + System.out.println("msg " + me.getMessage()); + System.out.println("loc " + me.getLocalizedMessage()); + System.out.println("cause " + me.getCause()); + System.out.println("excep " + me); me.printStackTrace(); } } diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java new file mode 100644 index 0000000..9ed2037 --- /dev/null +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/MQTT/Test2.java @@ -0,0 +1,86 @@ +package com.muyu.analysis.parsing.mqtt; + +import com.alibaba.fastjson2.JSONObject; +import lombok.extern.log4j.Log4j2; +import org.w3c.dom.stylesheets.LinkStyle; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @Author:李庆帅 + * @Package:com.muyu.analysis.parsing.mqtt + * @Project:cloud-server + * @name:Test2 + * @Date:2024/10/6 20:36 + */ +@Log4j2 +public class Test2 +{ + private static final int DURATION_SECONDS = 5; + private static List receivedStrings = new ArrayList<>(); + private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private static int elapsedSeconds = 0; + private static String file = "elapsed" ; + + public static void main(String[] args){ + //定义一个任务,每秒执行一次 + Runnable task = new Runnable() { + @Override + public void run() { + JSONObject stringFromSource = getStringFromSource(); + receivedStrings.add(stringFromSource); + System.out.println("Received:"+stringFromSource); + //清理超过的数据 + cleanUpOIdStrings(); + //检查超速条件 + checkForSpeeding(); + } + }; + //每个1秒执行一次任务 + scheduler.scheduleAtFixedRate(task,0,1, TimeUnit.SECONDS); + } + //模拟从某个源获取字符串的方法 + private static JSONObject getStringFromSource(){ + JSONObject jsonObject = new JSONObject(); + jsonObject.put("message","Hello World"); + jsonObject.put("time",System.currentTimeMillis()); + jsonObject.put("elapsed",elapsedSeconds); + return jsonObject; + } + + //清理超过60秒的数据 + private static void cleanUpOIdStrings(){ + long currentTime = System.currentTimeMillis(); + receivedStrings.removeIf(jsonObject ->currentTime-jsonObject.getLong("time")>TimeUnit.SECONDS.toMicros(DURATION_SECONDS)); + } + + //检查是否有超速情况 + private static void checkForSpeeding() + { + if(receivedStrings.size() < 2)return;//如果数据不足,直接返回 + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("message","你好"); + jsonObject.put("time",System.currentTimeMillis()); + jsonObject.put("elapsed",10); + + for (int i = 0; i < receivedStrings.size(); i++) { + JSONObject current = receivedStrings.get(i); + JSONObject next = receivedStrings.get(i + 1); + + Short currentElapsed = current.getShort(file); + Short nextElapsed = next.getShort(file); + receivedStrings.add(jsonObject); + //检查条件,如果相差大于12,则记录错误 + if (nextElapsed - currentElapsed > 12) { + System.out.println("出错啦!出错啦!车子超速啦!!!"); + } + } + } + + +} diff --git a/JavaSample-tcp1061513671883/.lck b/cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java similarity index 100% rename from JavaSample-tcp1061513671883/.lck rename to cloud-modules/cloud-modules-protocol-analysis/src/main/java/com/muyu/analysis/parsing/util/CacheUtil.java diff --git a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml index e6feb7f..567d611 100644 --- a/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-protocol-analysis/src/main/resources/bootstrap.yml @@ -7,7 +7,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev spring: application: diff --git a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml index 4d1adbc..220bf4e 100644 --- a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev spring: application: diff --git a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml index cbbe784..ddca326 100644 --- a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml +++ b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: lqs + namespace: dev # Spring spring: diff --git a/pom.xml b/pom.xml index a49e303..e3c7e2c 100644 --- a/pom.xml +++ b/pom.xml @@ -303,6 +303,19 @@ cloud-modules-enterprise-common ${muyu.version} + + + + com.muyu + cloud-modules-protocol-analysis + ${muyu.version} + + + + com.muyu + cloud-common-mqtt + ${muyu.version} +