Load_Center/src/main/java/com/load/SubscribeSample.java

62 lines
1.8 KiB
Java

package com.load;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 订阅 MQTT 主题
*/
public class SubscribeSample {
public static void main(String[] args) {
/**
* 代理地址
*/
String broker = "tcp://39.100.87.192:1883";
/**
* 主题
*/
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
/**
* 客户端ID(随机)
*/
String clientid = "subscribe_client";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(60);
// 堵塞60S
options.setKeepAliveInterval(60);
// 设置回调
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
client.connect(options);
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}