package com.muyu.loadCenter.mqttx; import com.alibaba.fastjson2.JSON; import com.fasterxml.jackson.databind.ObjectMapper; import com.muyu.loadCenter.kafka.KafkaConfig; import com.muyu.loadCenter.utils.AnalyzeUtils; import com.muyu.system.common.domain.VehicleData; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Properties; import static com.muyu.loadCenter.mqttx.ConversionUtil.hexStringToString; /** * @ProjectName: cloud-vehicles * @PackageName: com.muyu.system.mqtt * @Description TODO * @Author XiaoFan * @Date 2024/3/31 19:49 * @Version 1.0 */ @Component public class MqttSubscriber { //MQTT服务器地址 private static String broker="tcp://10.10.26.5:1883"; //客户端ID,可根据实际情况自定义 private static String clientId="mqttx_1c945161"; //订阅的主题 private static String topic="test"; //消息服务质量 private static int qos=1; private static Thread daemonThread; // @PostConstruct // public void init() { // daemonThread = new Thread(() -> { // sendCode(); // }); // daemonThread.setDaemon(true); // daemonThread.start(); @Scheduled(cron = "0/1 * * * * * ") // @PostConstruct public void sendCode(){ //消息持久化方式,这里选择内存持久化 MemoryPersistence persistence=new MemoryPersistence(); try { //创建MQTT客户端实例 MqttClient sampleClient = new MqttClient(broker, clientId, persistence); //设置连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); //打印连接信息 System.out.println("Connecting to broker:" +broker); //连接到MQTT代理 sampleClient.connect(connOpts); //打印连接成功信息 System.out.println("Connected"); //打印订阅信息 System.out.println("Subscribing to topic:" +topic); //订阅指定主题 sampleClient.subscribe(topic,qos); //设置消息到打的回调函数 sampleClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { //连接丢失时的处理逻辑 System.out.println("Connection lost!"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String hexString = new String(message.getPayload()); //数据编译 String msg1 = hexStringToString(hexString); //切割成对象 VehicleData analyze = AnalyzeUtils.analyze(msg1); //对象变json ObjectMapper objectMapper = new ObjectMapper(); String msg = objectMapper.writeValueAsString(analyze); //接收到消息时的处理逻辑 Properties properties1 = KafkaConfig.properties1(); KafkaProducer kafkaProducer = new KafkaProducer<>(properties1); // 3: 发送消息 // 封装发送消息对象 ProducerRecord record = new ProducerRecord<>(KafkaConfig.DEFAULT_TOPIC, KafkaConfig.DEFAULT_KEY, msg); // 异步发送并处理结果 kafkaProducer.send(record); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("我是发送成功后要注册到车辆网关的"+iMqttDeliveryToken.toString()); System.out.println("我是发送成功后要注册到车辆网关的"+iMqttDeliveryToken.getMessageId()); System.out.println("我是发送成功后要注册到车辆网关的"+iMqttDeliveryToken.getActionCallback()); //消息发送完成后的处理逻辑 //在这个示例中不做处理 } }); //循环等待接收消息 while (true){ Thread.sleep(1000);//每隔一秒检查一次是否有新消息 } }catch (MqttException | InterruptedException me){ System.out.println("msg"+me.getMessage()); System.out.println("loc"+me.getLocalizedMessage()); System.out.println("cause"+me.getCause()); System.out.println("excep"+me); me.printStackTrace(); } } }