cloud-plus-server/cloud-modules/cloud-modules-vehicleGateway/src/main/java/com/muyu/vehicleGateway/MqttPublishSample.java

57 lines
2.0 KiB
Java

package com.muyu.vehicleGateway;
import lombok.extern.log4j.Log4j2;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
@Log4j2
@Slf4j
public class MqttPublishSample {
public static void main(String[] args) {
// 定义一个用于MQTT消息发布的示例程序
String topic = "vehicleGateway";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://123.57.152.124:1883";
String clientId = "JavaSample";
try {
// 创建MQTT客户端并连接到指定的broker
MqttClient sampleClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
sampleClient.subscribe(topic,0);
// 设置回调处理
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失的处理
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// 消息到达的处理
System.out.println(new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// 消息发送完成的处理
}
});
} catch(MqttException me) {
// 处理MQTT异常
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}