diff --git a/mcwl-resource/pom.xml b/mcwl-resource/pom.xml index ad7488d..adbf9f4 100644 --- a/mcwl-resource/pom.xml +++ b/mcwl-resource/pom.xml @@ -39,5 +39,11 @@ 3.1.2 + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + + diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageCommentLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageCommentLikePushCallback.java new file mode 100644 index 0000000..15099cc --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageCommentLikePushCallback.java @@ -0,0 +1,29 @@ +package com.mcwl.resource.domain; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Log4j2 +public class ImageCommentLikePushCallback implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + // 连接丢失后,一般在这里面进行重连 + System.out.println("连接断开,可以做重连"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // subscribe后得到的消息会执行到这里面 + System.out.println("接收消息主题:" + topic); + System.out.println("接收消息Qos:" + message.getQos()); + System.out.println("接收消息内容:" + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageLikePushCallback.java new file mode 100644 index 0000000..8bb2e74 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageLikePushCallback.java @@ -0,0 +1,29 @@ +package com.mcwl.resource.domain; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Log4j2 +public class ImageLikePushCallback implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + // 连接丢失后,一般在这里面进行重连 + System.out.println("连接断开,可以做重连"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // subscribe后得到的消息会执行到这里面 + System.out.println("接收消息主题:" + topic); + System.out.println("接收消息Qos:" + message.getQos()); + System.out.println("接收消息内容:" + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelCommentLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelCommentLikePushCallback.java new file mode 100644 index 0000000..3e96760 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelCommentLikePushCallback.java @@ -0,0 +1,28 @@ +package com.mcwl.resource.domain; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Log4j2 +public class ModelCommentLikePushCallback implements MqttCallback { + @Override + public void connectionLost(Throwable cause) { + // 连接丢失后,一般在这里面进行重连 + System.out.println("连接断开,可以做重连"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // subscribe后得到的消息会执行到这里面 + System.out.println("接收消息主题:" + topic); + System.out.println("接收消息Qos:" + message.getQos()); + System.out.println("接收消息内容:" + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelLikePushCallback.java new file mode 100644 index 0000000..84eaf49 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelLikePushCallback.java @@ -0,0 +1,28 @@ +package com.mcwl.resource.domain; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Log4j2 +public class ModelLikePushCallback implements MqttCallback { + @Override + public void connectionLost(Throwable cause) { + // 连接丢失后,一般在这里面进行重连 + System.out.println("连接断开,可以做重连"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // subscribe后得到的消息会执行到这里面 + System.out.println("接收消息主题:" + topic); + System.out.println("接收消息Qos:" + message.getQos()); + System.out.println("接收消息内容:" + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowCommentLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowCommentLikePushCallback.java new file mode 100644 index 0000000..a1f9dca --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowCommentLikePushCallback.java @@ -0,0 +1,28 @@ +package com.mcwl.resource.domain; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Log4j2 +public class WorkFlowCommentLikePushCallback implements MqttCallback { + @Override + public void connectionLost(Throwable cause) { + // 连接丢失后,一般在这里面进行重连 + System.out.println("连接断开,可以做重连"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // subscribe后得到的消息会执行到这里面 + System.out.println("接收消息主题:" + topic); + System.out.println("接收消息Qos:" + message.getQos()); + System.out.println("接收消息内容:" + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowLikePushCallback.java new file mode 100644 index 0000000..05e01ce --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowLikePushCallback.java @@ -0,0 +1,28 @@ +package com.mcwl.resource.domain; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Log4j2 +public class WorkFlowLikePushCallback implements MqttCallback { + @Override + public void connectionLost(Throwable cause) { + // 连接丢失后,一般在这里面进行重连 + System.out.println("连接断开,可以做重连"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // subscribe后得到的消息会执行到这里面 + System.out.println("接收消息主题:" + topic); + System.out.println("接收消息Qos:" + message.getQos()); + System.out.println("接收消息内容:" + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/util/EMQXUtil.java b/mcwl-resource/src/main/java/com/mcwl/resource/util/EMQXUtil.java new file mode 100644 index 0000000..c1a9c57 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/util/EMQXUtil.java @@ -0,0 +1,207 @@ +package com.mcwl.resource.util; + +import com.mcwl.resource.domain.*; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static com.mcwl.common.utils.Threads.sleep; + +@Log4j2 +public class EMQXUtil { + + private static final String BROKER_URL = "tcp://192.168.136.128:1883"; + + private static final int connectionTimeout = 30; + private static final int keepAliveInterval = 60; + private static final int MAX_RETRIES = 3; + + // 线程池管理连接线程 + private static final ExecutorService executor = Executors.newFixedThreadPool(3); + + // 客户端存储(替代原来的静态变量) + private static final Map clients = new ConcurrentHashMap<>(); + + // 主题配置(修正后的主题定义) + @Getter + @AllArgsConstructor + public enum TopicConfig { + MODEL_LIKE("modelLikeTopic/1", + "modelLikeTopic/1", + "modelLikeClient_"), + + MODEL_COMMENT_LIKE("modelCommentLikeTopic/1", + "modelCommentLikeTopic/1", + "modelCommentLikeClient_"), + + IMAGE_LIKE("imageLikeTopic/1", + "imageLikeTopic/1", + "imageLikeClient_"), + + IMAGE_COMMENT_LIKE("imageCommentLikeTopic/1", + "imageCommentLikeTopic/1", + "imageCommentLikeClient_"), + + WORKFLOW_LIKE("workFlowLikeTopic/1", + "workFlowLikeTopic/1", + "workFlowLikeClient_"), + + WORKFLOW_COMMENT_LIKE("workFlowCommentLikeTopic/1", + "workFlowCommentLikeTopic/1", + "workFlowCommentLikeClient_"); + + final String subTopic; + final String pubTopic; + final String clientIdPrefix; + } + + + // 获取客户端(线程安全版本) + private static MqttClient getClient(TopicConfig type) { + return clients.computeIfAbsent(type, t -> { + try { + MqttClient client = new MqttClient(BROKER_URL, + generateClientId(t), new MemoryPersistence()); + setupConnection(client, t); + return client; + } catch (MqttException e) { + throw new RuntimeException("客户端初始化失败", e); + } + }); + } + + // 带自动重连的连接设置 + private static void setupConnection(MqttClient client, TopicConfig type) { + MqttConnectOptions opts = new MqttConnectOptions(); + opts.setAutomaticReconnect(true); + opts.setCleanSession(true); + opts.setConnectionTimeout(connectionTimeout); + opts.setKeepAliveInterval(keepAliveInterval); + client.setCallback(createCallback(type)); + + // 异步连接(避免阻塞) + executor.submit(() -> { + int retries = 0; + while (!client.isConnected() && retries < MAX_RETRIES) { + try { + client.connect(opts); + client.subscribe(type.subTopic); + clients.put(type, client); + } catch (MqttException e) { + long delay = (long) Math.min(1000 * Math.pow(2, retries), 30000); + sleep(delay); + retries++; + } + } + + if (retries >= MAX_RETRIES) { + clients.remove(type); + log.error("连接失败超过最大重试次数"); + } + }); + } + + // 回调工厂方法 + private static MqttCallback createCallback(TopicConfig type) { + switch (type) { + case MODEL_LIKE: + return new ModelLikePushCallback(); + case IMAGE_LIKE: + return new ImageLikePushCallback(); + case WORKFLOW_LIKE: + return new WorkFlowLikePushCallback(); + case MODEL_COMMENT_LIKE: + return new ModelCommentLikePushCallback(); + case IMAGE_COMMENT_LIKE: + return new ImageCommentLikePushCallback(); + case WORKFLOW_COMMENT_LIKE: + return new WorkFlowCommentLikePushCallback(); + default: + throw new IllegalArgumentException("未知主题类型: " + type); + } + } + + + // 使用不同的ClientId(增加随机后缀) + private static String generateClientId(TopicConfig config) { + return config.clientIdPrefix + UUID.randomUUID().toString().substring(0, 8); + } + + // 消息发布方法(带连接检查) + public static void sendMessage(TopicConfig type, String message) { + sendMessage(type, type.pubTopic, 2, message); + } + + // 其他发布方法类似,可进一步抽象 + public static void sendMessage(TopicConfig type, String topic, int qos, String msg) { + int retry = 0; + MqttClient client = getClient(type); + send(client, topic, qos, msg, retry); + + } + + public static void sendMessage(MqttClient client, String topic, int qos, String msg) { + int retry = 0; + send(client, topic, qos, msg, retry); + + } + + private static void send(MqttClient client, String topic, int qos, String msg, int retry) { + waitForConnection(client); // 等待最多5秒 + while (retry < 3) { + try { + if (client.isConnected()) { + client.publish(topic, msg.getBytes(), qos, false); + } else { + log.error("客户端未连接!"); + } + break; + } catch (MqttException e) { + retry++; + sleep(1000L * retry); + log.error("发布失败: {}", e.getMessage()); + } + } + } + + private static void waitForConnection(MqttClient client) { + long start = System.currentTimeMillis(); + while (!client.isConnected() && (System.currentTimeMillis() - start) < 5000) { + sleep(100); + } + } + + // 关闭所有连接 + public static void shutdown() { + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + clients.forEach((k, v) -> closeClient(v)); + clients.clear(); + } + + private static void closeClient(MqttClient client) { + try { + if (client != null) { + client.disconnect(); + client.close(); + } + } catch (MqttException e) { + log.error("关闭客户端错误: {}", e.getMessage()); + } + } +} \ No newline at end of file