package com.mcwl.resource.util; import com.mcwl.resource.domain.*; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static com.mcwl.common.utils.Threads.sleep; @Log4j2 public class EMQXUtil { private static final String BROKER_URL = "tcp://192.168.136.128:1883"; private static final int connectionTimeout = 30; private static final int keepAliveInterval = 60; private static final int MAX_RETRIES = 3; // 线程池管理连接线程 private static final ExecutorService executor = Executors.newFixedThreadPool(3); // 客户端存储(替代原来的静态变量) private static final Map clients = new ConcurrentHashMap<>(); // 主题配置(修正后的主题定义) @Getter @AllArgsConstructor public enum TopicConfig { MODEL_LIKE("modelLikeTopic/1", "modelLikeTopic/1", "modelLikeClient_"), MODEL_COMMENT_LIKE("modelCommentLikeTopic/1", "modelCommentLikeTopic/1", "modelCommentLikeClient_"), IMAGE_LIKE("imageLikeTopic/1", "imageLikeTopic/1", "imageLikeClient_"), IMAGE_COMMENT_LIKE("imageCommentLikeTopic/1", "imageCommentLikeTopic/1", "imageCommentLikeClient_"), WORKFLOW_LIKE("workFlowLikeTopic/1", "workFlowLikeTopic/1", "workFlowLikeClient_"), WORKFLOW_COMMENT_LIKE("workFlowCommentLikeTopic/1", "workFlowCommentLikeTopic/1", "workFlowCommentLikeClient_"); final String subTopic; final String pubTopic; final String clientIdPrefix; } // 获取客户端(线程安全版本) private static MqttClient getClient(TopicConfig type) { return clients.computeIfAbsent(type, t -> { try { MqttClient client = new MqttClient(BROKER_URL, generateClientId(t), new MemoryPersistence()); setupConnection(client, t); return client; } catch (MqttException e) { throw new RuntimeException("客户端初始化失败", e); } }); } // 带自动重连的连接设置 private static void setupConnection(MqttClient client, TopicConfig type) { MqttConnectOptions opts = new MqttConnectOptions(); opts.setAutomaticReconnect(true); opts.setCleanSession(true); opts.setConnectionTimeout(connectionTimeout); opts.setKeepAliveInterval(keepAliveInterval); client.setCallback(createCallback(type)); // 异步连接(避免阻塞) executor.submit(() -> { int retries = 0; while (!client.isConnected() && retries < MAX_RETRIES) { try { client.connect(opts); client.subscribe(type.subTopic); clients.put(type, client); } catch (MqttException e) { long delay = (long) Math.min(1000 * Math.pow(2, retries), 30000); sleep(delay); retries++; } } if (retries >= MAX_RETRIES) { clients.remove(type); log.error("连接失败超过最大重试次数"); } }); } // 回调工厂方法 private static MqttCallback createCallback(TopicConfig type) { switch (type) { case MODEL_LIKE: return new ModelLikePushCallback(); case IMAGE_LIKE: return new ImageLikePushCallback(); case WORKFLOW_LIKE: return new WorkFlowLikePushCallback(); case MODEL_COMMENT_LIKE: return new ModelCommentLikePushCallback(); case IMAGE_COMMENT_LIKE: return new ImageCommentLikePushCallback(); case WORKFLOW_COMMENT_LIKE: return new WorkFlowCommentLikePushCallback(); default: throw new IllegalArgumentException("未知主题类型: " + type); } } // 使用不同的ClientId(增加随机后缀) private static String generateClientId(TopicConfig config) { return config.clientIdPrefix + UUID.randomUUID().toString().substring(0, 8); } // 消息发布方法(带连接检查) public static void sendMessage(TopicConfig type, String message) { sendMessage(type, type.pubTopic, 2, message); } // 其他发布方法类似,可进一步抽象 public static void sendMessage(TopicConfig type, String topic, int qos, String msg) { int retry = 0; MqttClient client = getClient(type); send(client, topic, qos, msg, retry); } public static void sendMessage(MqttClient client, String topic, int qos, String msg) { int retry = 0; send(client, topic, qos, msg, retry); } private static void send(MqttClient client, String topic, int qos, String msg, int retry) { waitForConnection(client); // 等待最多5秒 while (retry < 3) { try { if (client.isConnected()) { client.publish(topic, msg.getBytes(), qos, false); } else { log.error("客户端未连接!"); } break; } catch (MqttException e) { retry++; sleep(1000L * retry); log.error("发布失败: {}", e.getMessage()); } } } private static void waitForConnection(MqttClient client) { long start = System.currentTimeMillis(); while (!client.isConnected() && (System.currentTimeMillis() - start) < 5000) { sleep(100); } } // 关闭所有连接 public static void shutdown() { executor.shutdown(); try { if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } clients.forEach((k, v) -> closeClient(v)); clients.clear(); } private static void closeClient(MqttClient client) { try { if (client != null) { client.disconnect(); client.close(); } } catch (MqttException e) { log.error("关闭客户端错误: {}", e.getMessage()); } } }