From fa25c11a79f916b4642d04b1eb43c89f028b61af Mon Sep 17 00:00:00 2001 From: rouchen <3133657697@qq.com> Date: Fri, 31 May 2024 22:21:40 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=20=E5=8F=91=E5=B8=83=20MQTT=20=E6=B6=88?= =?UTF-8?q?=E6=81=AF=20=E5=92=8C=E8=AE=A2=E9=98=85=20MQTT=20=E4=B8=BB?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/muyu/mqtt/MsgHandler.java | 2 +- .../java/com/muyu/receive/PublishSample.java | 52 ++++++++++++++++ .../com/muyu/receive/ReceiveCarMessage.java | 60 +++++++++++++++++++ .../com/muyu/receive/SubscribeSample.java | 49 +++++++++++++++ 4 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/muyu/receive/PublishSample.java create mode 100644 src/main/java/com/muyu/receive/ReceiveCarMessage.java create mode 100644 src/main/java/com/muyu/receive/SubscribeSample.java diff --git a/src/main/java/com/muyu/mqtt/MsgHandler.java b/src/main/java/com/muyu/mqtt/MsgHandler.java index 25a101f..6c33511 100644 --- a/src/main/java/com/muyu/mqtt/MsgHandler.java +++ b/src/main/java/com/muyu/mqtt/MsgHandler.java @@ -23,7 +23,7 @@ public class MsgHandler { System.out.println("接收到消息:" + msg); MqttProperties mqttProperties = MqttProperties.configBuild( "47.102.133.88", - "mqtt/test" + "test1" ); log.error("接收到消息初始化信息:{}",mqttProperties); MqttClient mqttClient = mqttFactory.creatClient(mqttProperties); diff --git a/src/main/java/com/muyu/receive/PublishSample.java b/src/main/java/com/muyu/receive/PublishSample.java new file mode 100644 index 0000000..60d2b61 --- /dev/null +++ b/src/main/java/com/muyu/receive/PublishSample.java @@ -0,0 +1,52 @@ +package com.muyu.receive; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class PublishSample { + + public static void main(String[] args) { + + String broker = "tcp://47.102.133.88:1883"; + String topic = "test1"; + String username = ""; + String password = ""; + String clientid = "publish_client"; + String content = "Hello MQTT"; + + int qos = 0; + try { + MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + // 设置用户名和密码 + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + // 连接 + client.connect(options); + // 创建消息并设置 QoS + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + // 发布消息 + client.publish(topic, message); + System.out.println("Message published"); + System.out.println("topic: " + topic); + System.out.println("message content: " + content); + // 关闭连接 + client.disconnect(); + // 关闭客户端 + client.close(); + } catch (MqttException e) { + throw new RuntimeException(e); + } + SpringApplication.run(PublishSample.class, args); + } +} diff --git a/src/main/java/com/muyu/receive/ReceiveCarMessage.java b/src/main/java/com/muyu/receive/ReceiveCarMessage.java new file mode 100644 index 0000000..536d65b --- /dev/null +++ b/src/main/java/com/muyu/receive/ReceiveCarMessage.java @@ -0,0 +1,60 @@ +package com.muyu.receive; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * 接受小车报文 + * + * @author Yangle + * Date 2024/5/31 20:26 + */ +@SpringBootApplication +public class ReceiveCarMessage { + + public static void main(String[] args) { + String broker ="tcp://47.102.133.88:1883"; + String topic="mqtt/test"; + String username="emqx"; + String password="public"; + String cliented="subscribe_client"; + int qos=0; + + try { + MqttClient client = new MqttClient(broker, cliented, new MemoryPersistence()); + //连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + //设置回调 + client.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + System.out.println("connectionLost:"+cause.getMessage()); + } + + @Override + public void messageArrived(String s, MqttMessage message) throws Exception { + System.out.println("topice:"+topic); + System.out.println("Qos:"+message.getQos()); + System.out.println("message content:"+new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + }); + client.connect(options); + client.subscribe(topic,qos); + } catch (MqttException e) { + throw new RuntimeException(e); + } + SpringApplication.run(ReceiveCarMessage.class, args); + } + +} diff --git a/src/main/java/com/muyu/receive/SubscribeSample.java b/src/main/java/com/muyu/receive/SubscribeSample.java new file mode 100644 index 0000000..31718fd --- /dev/null +++ b/src/main/java/com/muyu/receive/SubscribeSample.java @@ -0,0 +1,49 @@ +package com.muyu.receive; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +public class SubscribeSample { + public static void main(String[] args) { + String broker = "tcp://47.102.133.88:1883"; + String topic = "mqtt/test"; + String username = "emqx"; + String password = "public"; + String clientid = "subscribe_client"; + int qos = 0; + + try { + MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + // 设置回调 + client.setCallback(new MqttCallback() { + + @Override + public void connectionLost(Throwable cause) { + System.out.println("connectionLost: " + cause.getMessage()); + } + @Override + public void messageArrived(String topic, MqttMessage message) { + System.out.println("topic: " + topic); + System.out.println("Qos: " + message.getQos()); + System.out.println("message content: " + new String(message.getPayload())); + + } + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + + }); + client.connect(options); + client.subscribe(topic, qos); + } catch (Exception e) { + e.printStackTrace(); + } + } +}