From 41d617b26f29f80347efdb6f92345766f6408b8e Mon Sep 17 00:00:00 2001 From: yang <2119157836@qq.com> Date: Fri, 14 Mar 2025 13:43:21 +0800 Subject: [PATCH] =?UTF-8?q?refactor(mcwl-resource):=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=20MQTT=20=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/controller/mqtt/MqttController.java | 26 ++ .../src/test/java/com/mcwl/TestMQTT.java | 22 ++ .../domain/ImageCommentLikePushCallback.java | 29 --- .../domain/ImageLikePushCallback.java | 29 --- .../domain/ModelCommentLikePushCallback.java | 28 -- .../domain/ModelLikePushCallback.java | 28 -- .../WorkFlowCommentLikePushCallback.java | 28 -- .../domain/WorkFlowLikePushCallback.java | 28 -- .../resource/handler/IMessageHandler.java | 38 +++ .../handler/impl/ImageCommentLikeHandler.java | 126 +++++++++ .../handler/impl/ImageLikeHandler.java | 122 +++++++++ .../handler/impl/ModelCommentLikeHandler.java | 121 +++++++++ .../handler/impl/ModelLikeHandler.java | 120 +++++++++ .../impl/WorkFlowCommentLikeHandler.java | 124 +++++++++ .../handler/impl/WorkFlowLikeHandler.java | 118 +++++++++ .../impl/ModelCommentLikeServiceImpl.java | 80 +----- .../ModelImageCommentLikeServiceImpl.java | 82 +----- .../impl/ModelImageLikeServiceImpl.java | 53 +--- .../service/impl/ModelLikeServiceImpl.java | 79 +----- .../impl/WorkFlowCommentLikeServiceImpl.java | 79 +----- .../service/impl/WorkFlowLikeServiceImpl.java | 74 +----- .../java/com/mcwl/resource/util/EMQXUtil.java | 207 --------------- .../com/mcwl/resource/util/MqttTemplate.java | 245 ++++++++++++++++++ 23 files changed, 1122 insertions(+), 764 deletions(-) create mode 100644 mcwl-admin/src/main/java/com/mcwl/web/controller/mqtt/MqttController.java create mode 100644 mcwl-admin/src/test/java/com/mcwl/TestMQTT.java delete mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageCommentLikePushCallback.java delete mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageLikePushCallback.java delete mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelCommentLikePushCallback.java delete mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelLikePushCallback.java delete mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowCommentLikePushCallback.java delete mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowLikePushCallback.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/handler/IMessageHandler.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageCommentLikeHandler.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageLikeHandler.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelCommentLikeHandler.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelLikeHandler.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowCommentLikeHandler.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowLikeHandler.java delete mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/util/EMQXUtil.java create mode 100644 mcwl-resource/src/main/java/com/mcwl/resource/util/MqttTemplate.java diff --git a/mcwl-admin/src/main/java/com/mcwl/web/controller/mqtt/MqttController.java b/mcwl-admin/src/main/java/com/mcwl/web/controller/mqtt/MqttController.java new file mode 100644 index 0000000..6ba7662 --- /dev/null +++ b/mcwl-admin/src/main/java/com/mcwl/web/controller/mqtt/MqttController.java @@ -0,0 +1,26 @@ +package com.mcwl.web.controller.mqtt; + +import com.mcwl.common.core.domain.R; +import com.mcwl.myInvitation.domain.vo.CommissionRatioVo; +import com.mcwl.resource.util.MqttTemplate; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController() +@RequiredArgsConstructor +@RequestMapping("/mqtt") +public class MqttController { + + private final MqttTemplate mqttTemplate; + + @GetMapping("/send") + public void list(String topic, String msg) { + mqttTemplate.publish(topic, msg); + } + +} diff --git a/mcwl-admin/src/test/java/com/mcwl/TestMQTT.java b/mcwl-admin/src/test/java/com/mcwl/TestMQTT.java new file mode 100644 index 0000000..54bb221 --- /dev/null +++ b/mcwl-admin/src/test/java/com/mcwl/TestMQTT.java @@ -0,0 +1,22 @@ +package com.mcwl; + + +import com.mcwl.resource.util.MqttTemplate; +import lombok.RequiredArgsConstructor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@SpringBootTest(classes = McWlApplication.class) +@RunWith(SpringRunner.class) +@RequiredArgsConstructor +public class TestMQTT { + + private final MqttTemplate mqttTemplate; + + @Test + public void publishTest() throws Exception { + mqttTemplate.publish("workFlow/1", "test"); + } +} diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageCommentLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageCommentLikePushCallback.java deleted file mode 100644 index 15099cc..0000000 --- a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageCommentLikePushCallback.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.mcwl.resource.domain; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -@Log4j2 -public class ImageCommentLikePushCallback implements MqttCallback { - - @Override - public void connectionLost(Throwable cause) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - // subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题:" + topic); - System.out.println("接收消息Qos:" + message.getQos()); - System.out.println("接收消息内容:" + new String(message.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } -} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageLikePushCallback.java deleted file mode 100644 index 8bb2e74..0000000 --- a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ImageLikePushCallback.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.mcwl.resource.domain; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -@Log4j2 -public class ImageLikePushCallback implements MqttCallback { - - @Override - public void connectionLost(Throwable cause) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - // subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题:" + topic); - System.out.println("接收消息Qos:" + message.getQos()); - System.out.println("接收消息内容:" + new String(message.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } -} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelCommentLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelCommentLikePushCallback.java deleted file mode 100644 index 3e96760..0000000 --- a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelCommentLikePushCallback.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.mcwl.resource.domain; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -@Log4j2 -public class ModelCommentLikePushCallback implements MqttCallback { - @Override - public void connectionLost(Throwable cause) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - // subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题:" + topic); - System.out.println("接收消息Qos:" + message.getQos()); - System.out.println("接收消息内容:" + new String(message.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } -} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelLikePushCallback.java deleted file mode 100644 index 84eaf49..0000000 --- a/mcwl-resource/src/main/java/com/mcwl/resource/domain/ModelLikePushCallback.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.mcwl.resource.domain; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -@Log4j2 -public class ModelLikePushCallback implements MqttCallback { - @Override - public void connectionLost(Throwable cause) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - // subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题:" + topic); - System.out.println("接收消息Qos:" + message.getQos()); - System.out.println("接收消息内容:" + new String(message.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } -} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowCommentLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowCommentLikePushCallback.java deleted file mode 100644 index a1f9dca..0000000 --- a/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowCommentLikePushCallback.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.mcwl.resource.domain; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -@Log4j2 -public class WorkFlowCommentLikePushCallback implements MqttCallback { - @Override - public void connectionLost(Throwable cause) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - // subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题:" + topic); - System.out.println("接收消息Qos:" + message.getQos()); - System.out.println("接收消息内容:" + new String(message.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } -} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowLikePushCallback.java b/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowLikePushCallback.java deleted file mode 100644 index 05e01ce..0000000 --- a/mcwl-resource/src/main/java/com/mcwl/resource/domain/WorkFlowLikePushCallback.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.mcwl.resource.domain; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -@Log4j2 -public class WorkFlowLikePushCallback implements MqttCallback { - @Override - public void connectionLost(Throwable cause) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - // subscribe后得到的消息会执行到这里面 - System.out.println("接收消息主题:" + topic); - System.out.println("接收消息Qos:" + message.getQos()); - System.out.println("接收消息内容:" + new String(message.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } -} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/handler/IMessageHandler.java b/mcwl-resource/src/main/java/com/mcwl/resource/handler/IMessageHandler.java new file mode 100644 index 0000000..2ca9713 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/handler/IMessageHandler.java @@ -0,0 +1,38 @@ +package com.mcwl.resource.handler; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import java.lang.annotation.*; +import java.util.Collections; +import java.util.List; + +public interface IMessageHandler { + void handleMessage(String topic, MqttMessage message); + + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + @interface Topic { + String value(); + int qos() default 1; + } + + default List getTopics() { + Topic annotation = this.getClass().getAnnotation(Topic.class); + if (annotation != null) { + return Collections.singletonList( + new TopicSubscription(annotation.value(), annotation.qos()) + ); + } + return Collections.emptyList(); + } + + @Getter + @AllArgsConstructor + class TopicSubscription { + private String topicFilter; + private int qos; + } + +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageCommentLikeHandler.java b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageCommentLikeHandler.java new file mode 100644 index 0000000..b71b6c5 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageCommentLikeHandler.java @@ -0,0 +1,126 @@ +package com.mcwl.resource.handler.impl; + +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; +import com.mcwl.common.core.domain.entity.SysUser; +import com.mcwl.common.exception.ServiceException; +import com.mcwl.common.utils.SecurityUtils; +import com.mcwl.common.utils.StringUtils; +import com.mcwl.resource.domain.ModelImageComment; +import com.mcwl.resource.domain.ModelImageCommentLike; +import com.mcwl.resource.domain.SysAdvice; +import com.mcwl.resource.handler.IMessageHandler; +import com.mcwl.resource.mapper.ModelImageCommentLikeMapper; +import com.mcwl.resource.mapper.ModelImageCommentMapper; +import com.mcwl.resource.mapper.WorkFlowCommentLikeMapper; +import com.mcwl.resource.mapper.WorkFlowCommentMapper; +import com.mcwl.resource.service.ISysAdviceService; +import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.system.service.ISysUserService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Map; +import java.util.Objects; + +@Slf4j +@Component +@IMessageHandler.Topic(value = "imageCommentLike/#", qos = 1) +@RequiredArgsConstructor +public class ImageCommentLikeHandler implements IMessageHandler { + + private final ModelImageCommentMapper modelImageCommentMapper; + + private final ModelImageCommentLikeMapper modelImageCommentLikeMapper; + + private final ISysUserService userService; + + private final ISysAdviceService adviceService; + @Override + @Transactional(rollbackFor = Exception.class) + public void handleMessage(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + log.info("图片评论点赞: {} - {}", payload, message); + + JSON parse = JSONUtil.parse(payload); + Long commentId = Long.parseLong(parse.getByPath("commentId").toString()); + Long userId = Long.parseLong(parse.getByPath("userId").toString()); + + + + this.like(userId, commentId); + } + + + + private void like(Long userId, Long commentId) { + ModelImageComment modelImageComment = modelImageCommentMapper.selectById(commentId); + if (Objects.isNull(modelImageComment)) { + throw new ServiceException("该评论不存在"); + } + ModelImageCommentLike modelImageCommentLike = modelImageCommentLikeMapper.getLikeImageComment(userId, commentId); + if (Objects.nonNull(modelImageCommentLike)) { + if (Objects.equals(modelImageCommentLike.getDelFlag(), "0")) { + modelImageCommentLike.setDelFlag("2"); + int likeNum = modelImageComment.getLikeNum() - 1; + likeNum = Math.max(likeNum, 0); + modelImageComment.setLikeNum(likeNum); + } else { + modelImageCommentLike.setDelFlag("0"); + modelImageComment.setLikeNum(modelImageComment.getLikeNum() + 1); + this.addLikeAdvice(modelImageComment, userId); + } + // 更新点赞记录 + modelImageCommentLikeMapper.updateDelFlagById(modelImageCommentLike); + // 更新图片评论点赞数 + modelImageCommentMapper.updateById(modelImageComment); + return; + } + + // 添加点赞记录 + modelImageCommentLike = new ModelImageCommentLike(); + modelImageCommentLike.setUserId(userId); + modelImageCommentLike.setModelImageCommentId(commentId); + modelImageCommentLikeMapper.insert(modelImageCommentLike); + + // 更新图片点赞数 + modelImageComment.setLikeNum(modelImageComment.getLikeNum() + 1); + modelImageCommentMapper.updateById(modelImageComment); + + this.addLikeAdvice(modelImageComment, userId); + } + + + private void addLikeAdvice(ModelImageComment modelImageComment, Long userId) { + + Long receiverUserId = modelImageComment.getUserId(); + + if (Objects.equals(userId, receiverUserId)) { + return; + } + + SysUser sysUser = userService.selectUserById(userId); + + String content = StringUtils.format("恭喜!{}点赞了您的评论", + sysUser.getNickName()); + + SysAdvice sysAdvice = new SysAdvice(); + sysAdvice.setSenderId(userId); + sysAdvice.setReceiverId(receiverUserId); + sysAdvice.setContent(content); + sysAdvice.setProductId(modelImageComment.getModelImageId()); + sysAdvice.setProductType(2); + sysAdvice.setCommentId(modelImageComment.getId()); + sysAdvice.setIsRead(0); + sysAdvice.setType(AdviceEnum.LIKE_REMIND); + adviceService.save(sysAdvice); + + + } + + + +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageLikeHandler.java b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageLikeHandler.java new file mode 100644 index 0000000..8ece9af --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ImageLikeHandler.java @@ -0,0 +1,122 @@ +package com.mcwl.resource.handler.impl; + +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; +import com.mcwl.common.core.domain.entity.SysUser; +import com.mcwl.common.exception.ServiceException; +import com.mcwl.common.utils.SecurityUtils; +import com.mcwl.common.utils.StringUtils; +import com.mcwl.resource.domain.ModelImage; +import com.mcwl.resource.domain.ModelImageLike; +import com.mcwl.resource.domain.SysAdvice; +import com.mcwl.resource.handler.IMessageHandler; +import com.mcwl.resource.mapper.ModelImageLikeMapper; +import com.mcwl.resource.mapper.ModelImageMapper; +import com.mcwl.resource.mapper.WorkFlowLikeMapper; +import com.mcwl.resource.mapper.WorkFlowMapper; +import com.mcwl.resource.service.ISysAdviceService; +import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.system.service.ISysUserService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Objects; + +@Slf4j +@Component +@IMessageHandler.Topic(value = "imageLike/#", qos = 1) +@RequiredArgsConstructor +public class ImageLikeHandler implements IMessageHandler { + + + private final ModelImageMapper modelImageMapper; + + private final ModelImageLikeMapper modelImageLikeMapper; + + private final ISysUserService userService; + + private final ISysAdviceService adviceService; + + @Override + @Transactional(rollbackFor = Exception.class) + public void handleMessage(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + log.info("图片点赞: {}", payload); + + JSON parse = JSONUtil.parse(payload); + Long commentId = Long.parseLong(parse.getByPath("commentId").toString()); + Long userId = Long.parseLong(parse.getByPath("userId").toString()); + + this.like(userId, commentId); + } + + private void like(Long userId, Long imageId) { + ModelImage modelImage = modelImageMapper.selectById(imageId); + if (Objects.isNull(modelImage)) { + throw new ServiceException("该图片不存在或已下架"); + } + ModelImageLike modelImageLike = modelImageLikeMapper.getLikeImage(userId, imageId); + if (Objects.nonNull(modelImageLike)) { + if (Objects.equals(modelImageLike.getDelFlag(), "0")) { + modelImageLike.setDelFlag("2"); + int likeNum = modelImage.getLikeNum() - 1; + likeNum = Math.max(likeNum, 0); + modelImage.setLikeNum(likeNum); + } else { + modelImageLike.setDelFlag("0"); + modelImage.setLikeNum(modelImage.getLikeNum() + 1); + this.addLikeAdvice(modelImage, userId); + } + // 更新点赞记录 + modelImageLikeMapper.updateDelFlagById(modelImageLike); + // 更新图片点赞数 + modelImageMapper.updateById(modelImage); + return; + } + + // 添加点赞记录 + modelImageLike = new ModelImageLike(); + modelImageLike.setUserId(userId); + modelImageLike.setModelImageId(imageId); + modelImageLikeMapper.insert(modelImageLike); + + // 更新图片点赞数 + modelImage.setLikeNum(modelImage.getLikeNum() + 1); + modelImageMapper.updateById(modelImage); + + this.addLikeAdvice(modelImage, userId); + } + + + private void addLikeAdvice(ModelImage modelImage, Long userId) { + + Long receiverUserId = modelImage.getUserId(); + + if (Objects.equals(userId, receiverUserId)) { + return; + } + + SysUser sysUser = userService.selectUserById(userId); + + + String content = StringUtils.format("恭喜!{}点赞了您的图片:{}", + sysUser.getNickName(), modelImage.getTitle()); + + SysAdvice sysAdvice = new SysAdvice(); + sysAdvice.setSenderId(userId); + sysAdvice.setReceiverId(receiverUserId); + sysAdvice.setContent(content); + sysAdvice.setProductId(modelImage.getId()); + sysAdvice.setProductType(2); + sysAdvice.setIsRead(0); + sysAdvice.setType(AdviceEnum.LIKE_REMIND); + adviceService.save(sysAdvice); + + + } + + +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelCommentLikeHandler.java b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelCommentLikeHandler.java new file mode 100644 index 0000000..b317982 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelCommentLikeHandler.java @@ -0,0 +1,121 @@ +package com.mcwl.resource.handler.impl; + +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; +import com.mcwl.common.core.domain.entity.SysUser; +import com.mcwl.common.exception.ServiceException; +import com.mcwl.common.utils.SecurityUtils; +import com.mcwl.common.utils.StringUtils; +import com.mcwl.resource.domain.*; +import com.mcwl.resource.handler.IMessageHandler; +import com.mcwl.resource.mapper.ModelCommentLikeMapper; +import com.mcwl.resource.mapper.ModelCommentMapper; +import com.mcwl.resource.mapper.WorkFlowCommentLikeMapper; +import com.mcwl.resource.mapper.WorkFlowCommentMapper; +import com.mcwl.resource.service.ISysAdviceService; +import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.system.service.ISysUserService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Objects; + +@Slf4j +@Component +@IMessageHandler.Topic(value = "modelCommentLike/#", qos = 1) +@RequiredArgsConstructor +public class ModelCommentLikeHandler implements IMessageHandler { + + + private final ModelCommentMapper modelCommentMapper; + + private final ModelCommentLikeMapper modelCommentLikeMapper; + + private final ISysUserService userService; + + private final ISysAdviceService adviceService; + + @Override + @Transactional(rollbackFor = Exception.class) + public void handleMessage(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + log.info("模型评论点赞: {} - {}", payload, message); + + JSON parse = JSONUtil.parse(payload); + Long commentId = Long.parseLong(parse.getByPath("commentId").toString()); + Long userId = Long.parseLong(parse.getByPath("userId").toString()); + + + this.like(userId, commentId); + } + + private void like(Long userId, Long commentId) { + ModelComment modelComment = modelCommentMapper.selectById(commentId); + if (Objects.isNull(modelComment)) { + throw new ServiceException("该评论不存在"); + } + ModelCommentLike modelCommentLike = modelCommentLikeMapper.getLikeComment(userId, commentId); + if (Objects.nonNull(modelCommentLike)) { + if (Objects.equals(modelCommentLike.getDelFlag(), "0")) { + modelCommentLike.setDelFlag("2"); + int likeNum = modelComment.getLikeNum() - 1; + likeNum = Math.max(likeNum, 0); + modelComment.setLikeNum(likeNum); + } else { + modelCommentLike.setDelFlag("0"); + modelComment.setLikeNum(modelComment.getLikeNum() + 1); + this.addLikeAdvice(modelComment, userId); + } + // 更新点赞记录 + modelCommentLikeMapper.updateDelFlagById(modelCommentLike); + // 更新图片评论点赞数 + modelCommentMapper.updateById(modelComment); + return; + } + + // 添加点赞记录 + modelCommentLike = new ModelCommentLike(); + modelCommentLike.setUserId(userId); + modelCommentLike.setModelCommentId(commentId); + modelCommentLikeMapper.insert(modelCommentLike); + + // 更新模型点赞数 + modelComment.setLikeNum(modelComment.getLikeNum() + 1); + modelCommentMapper.updateById(modelComment); + + this.addLikeAdvice(modelComment, userId); + } + + + private void addLikeAdvice(ModelComment modelComment, Long userId) { + + Long receiverUserId = modelComment.getUserId(); + + if (Objects.equals(userId, receiverUserId)) { + return; + } + SysUser sysUser = userService.selectUserById(userId); + + String content = StringUtils.format("恭喜!{}点赞了您的评论", + sysUser.getNickName()); + + SysAdvice sysAdvice = new SysAdvice(); + sysAdvice.setSenderId(userId); + sysAdvice.setReceiverId(receiverUserId); + sysAdvice.setContent(content); + sysAdvice.setProductId(modelComment.getModelId()); + sysAdvice.setProductType(0); + sysAdvice.setCommentId(modelComment.getId()); + sysAdvice.setIsRead(0); + sysAdvice.setType(AdviceEnum.LIKE_REMIND); + adviceService.save(sysAdvice); + + + } + + + +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelLikeHandler.java b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelLikeHandler.java new file mode 100644 index 0000000..3e361c5 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/ModelLikeHandler.java @@ -0,0 +1,120 @@ +package com.mcwl.resource.handler.impl; + +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; +import com.mcwl.common.core.domain.entity.SysUser; +import com.mcwl.common.exception.ServiceException; +import com.mcwl.common.utils.SecurityUtils; +import com.mcwl.common.utils.StringUtils; +import com.mcwl.resource.domain.*; +import com.mcwl.resource.handler.IMessageHandler; +import com.mcwl.resource.mapper.ModelLikeMapper; +import com.mcwl.resource.mapper.ModelMapper; +import com.mcwl.resource.mapper.WorkFlowLikeMapper; +import com.mcwl.resource.mapper.WorkFlowMapper; +import com.mcwl.resource.service.ISysAdviceService; +import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.system.service.ISysUserService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Objects; + +@Slf4j +@Component +@IMessageHandler.Topic(value = "modelLike/#", qos = 1) +@RequiredArgsConstructor +public class ModelLikeHandler implements IMessageHandler { + + private final ModelMapper modelMapper; + + + private final ModelLikeMapper modelLikeMapper; + + private final ISysUserService userService; + + private final ISysAdviceService adviceService; + + + @Override + @Transactional(rollbackFor = Exception.class) + public void handleMessage(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + log.info("模型点赞: {}", payload); + + JSON parse = JSONUtil.parse(payload); + Long commentId = Long.parseLong(parse.getByPath("commentId").toString()); + Long userId = Long.parseLong(parse.getByPath("userId").toString()); + + + this.like(userId, commentId); + } + + private void like(Long userId, Long modelId) { + ModelProduct model = modelMapper.selectById(modelId); + if (Objects.isNull(model)) { + throw new ServiceException("该模型不存在或已下架"); + } + ModelLike modelLike = modelLikeMapper.getLike(userId, modelId); + if (Objects.nonNull(modelLike)) { + if (Objects.equals(modelLike.getDelFlag(), "0")) { + modelLike.setDelFlag("2"); + int likeNum = model.getLikeNum() - 1; + likeNum = Math.max(likeNum, 0); + model.setLikeNum(likeNum); + } else { + modelLike.setDelFlag("0"); + model.setLikeNum(model.getLikeNum() + 1); + this.addLikeAdvice(model, userId); + } + // 更新点赞记录 + modelLikeMapper.updateDelFlagById(modelLike); + // 更新图片点赞数 + modelMapper.updateById(model); + return; + } + + // 添加点赞记录 + modelLike = new ModelLike(); + modelLike.setUserId(userId); + modelLike.setModelId(modelId); + modelLikeMapper.insert(modelLike); + + // 更新图片点赞数 + model.setNumbers(model.getLikeNum() + 1); + modelMapper.updateById(model); + + this.addLikeAdvice(model, userId); + } + + + private void addLikeAdvice(ModelProduct model, Long userId) { + + Long receiverUserId = model.getUserId(); + + if (Objects.equals(userId, receiverUserId)) { + return; + } + SysUser sysUser = userService.selectUserById(userId); + + String content = StringUtils.format("恭喜!{}点赞了您的模型:{}", + sysUser.getNickName(), model.getModelName()); + + SysAdvice sysAdvice = new SysAdvice(); + sysAdvice.setSenderId(userId); + sysAdvice.setReceiverId(receiverUserId); + sysAdvice.setContent(content); + sysAdvice.setProductId(model.getId()); + sysAdvice.setProductType(0); + sysAdvice.setIsRead(0); + sysAdvice.setType(AdviceEnum.LIKE_REMIND); + adviceService.save(sysAdvice); + + + } + + +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowCommentLikeHandler.java b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowCommentLikeHandler.java new file mode 100644 index 0000000..004fae9 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowCommentLikeHandler.java @@ -0,0 +1,124 @@ +package com.mcwl.resource.handler.impl; + +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; +import com.mcwl.common.core.domain.entity.SysUser; +import com.mcwl.common.exception.ServiceException; +import com.mcwl.common.utils.SecurityUtils; +import com.mcwl.common.utils.StringUtils; +import com.mcwl.resource.domain.SysAdvice; +import com.mcwl.resource.domain.WorkFlowComment; +import com.mcwl.resource.domain.WorkFlowCommentLike; +import com.mcwl.resource.handler.IMessageHandler; +import com.mcwl.resource.mapper.WorkFlowCommentLikeMapper; +import com.mcwl.resource.mapper.WorkFlowCommentMapper; +import com.mcwl.resource.service.ISysAdviceService; +import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.system.service.ISysUserService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Objects; + +@Slf4j +@Component +@IMessageHandler.Topic(value = "workFlowCommentLike/#", qos = 1) +@RequiredArgsConstructor +public class WorkFlowCommentLikeHandler implements IMessageHandler { + + + private final WorkFlowCommentMapper workFlowCommentMapper; + + + private final WorkFlowCommentLikeMapper workFlowCommentLikeMapper; + + private final ISysUserService userService; + + private final ISysAdviceService adviceService; + + + + @Override + @Transactional(rollbackFor = Exception.class) + public void handleMessage(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + log.info("工作流评论点赞: {} - {}", topic, payload); + + JSON parse = JSONUtil.parse(payload); + Long commentId = Long.parseLong(parse.getByPath("commentId").toString()); + Long userId = Long.parseLong(parse.getByPath("userId").toString()); + + + this.like(userId, commentId); + } + + private void like(Long userId, Long commentId) { + WorkFlowComment workFlowComment = workFlowCommentMapper.selectById(commentId); + if (Objects.isNull(workFlowComment)) { + throw new ServiceException("该评论不存在"); + } + WorkFlowCommentLike workFlowCommentLike = workFlowCommentLikeMapper.getLikeComment(userId, commentId); + if (Objects.nonNull(workFlowCommentLike)) { + if (Objects.equals(workFlowCommentLike.getDelFlag(), "0")) { + workFlowCommentLike.setDelFlag("2"); + int likeNum = workFlowComment.getLikeNum() - 1; + likeNum = Math.max(likeNum, 0); + workFlowComment.setLikeNum(likeNum); + } else { + workFlowCommentLike.setDelFlag("0"); + workFlowComment.setLikeNum(workFlowComment.getLikeNum() + 1); + this.addLikeAdvice(workFlowComment, userId); + } + // 更新点赞记录 + workFlowCommentLikeMapper.updateDelFlagById(workFlowCommentLike); + // 更新图片评论点赞数 + workFlowCommentMapper.updateById(workFlowComment); + return; + } + + // 添加点赞记录 + workFlowCommentLike = new WorkFlowCommentLike(); + workFlowCommentLike.setUserId(userId); + workFlowCommentLike.setWorkFlowCommentId(commentId); + workFlowCommentLikeMapper.insert(workFlowCommentLike); + + // 更新模型点赞数 + workFlowComment.setLikeNum(workFlowComment.getLikeNum() + 1); + workFlowCommentMapper.updateById(workFlowComment); + + this.addLikeAdvice(workFlowComment, userId); + } + + + private void addLikeAdvice(WorkFlowComment workFlowComment, Long userId) { + + Long receiverUserId = workFlowComment.getUserId(); + + if (Objects.equals(userId, receiverUserId)) { + return; + } + SysUser sysUser = userService.selectUserById(userId); + + String content = StringUtils.format("恭喜!{}点赞了您的评论", + sysUser.getNickName()); + + SysAdvice sysAdvice = new SysAdvice(); + sysAdvice.setSenderId(userId); + sysAdvice.setReceiverId(receiverUserId); + sysAdvice.setContent(content); + sysAdvice.setProductId(workFlowComment.getWorkFlowId()); + sysAdvice.setProductType(1); + sysAdvice.setCommentId(workFlowComment.getId()); + sysAdvice.setIsRead(0); + sysAdvice.setType(AdviceEnum.LIKE_REMIND); + adviceService.save(sysAdvice); + + + } + + + +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowLikeHandler.java b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowLikeHandler.java new file mode 100644 index 0000000..c414178 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/handler/impl/WorkFlowLikeHandler.java @@ -0,0 +1,118 @@ +package com.mcwl.resource.handler.impl; + +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; +import com.mcwl.common.core.domain.entity.SysUser; +import com.mcwl.common.exception.ServiceException; +import com.mcwl.common.utils.SecurityUtils; +import com.mcwl.common.utils.StringUtils; +import com.mcwl.resource.domain.SysAdvice; +import com.mcwl.resource.domain.WorkFlow; +import com.mcwl.resource.domain.WorkFlowLike; +import com.mcwl.resource.handler.IMessageHandler; +import com.mcwl.resource.mapper.WorkFlowLikeMapper; +import com.mcwl.resource.mapper.WorkFlowMapper; +import com.mcwl.resource.service.ISysAdviceService; +import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.system.service.ISysUserService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Objects; + +@Slf4j +@Component +@IMessageHandler.Topic(value = "workFlowLike/#", qos = 1) +@RequiredArgsConstructor +public class WorkFlowLikeHandler implements IMessageHandler { + + private final WorkFlowMapper workFlowMapper; + + private final WorkFlowLikeMapper workFlowLikeMapper; + + private final ISysUserService userService; + + private final ISysAdviceService adviceService; + + @Override + @Transactional(rollbackFor = Exception.class) + public void handleMessage(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + log.info("工作流点赞: {} - {}", topic, payload); + + JSON parse = JSONUtil.parse(payload); + Long commentId = Long.parseLong(parse.getByPath("commentId").toString()); + Long userId = Long.parseLong(parse.getByPath("userId").toString()); + + + this.like(userId, commentId); + } + + private void like(Long userId, Long modelId) { + WorkFlow workFlow = workFlowMapper.selectById(modelId); + if (Objects.isNull(workFlow)) { + throw new ServiceException("该工作流不存在或已下架"); + } + WorkFlowLike workFlowLike = workFlowLikeMapper.getLike(userId, modelId); + if (Objects.nonNull(workFlowLike)) { + if (Objects.equals(workFlowLike.getDelFlag(), "0")) { + workFlowLike.setDelFlag("2"); + int likeCount = workFlow.getLikeNum() - 1; + likeCount = Math.max(likeCount, 0); + workFlow.setLikeNum(likeCount); + } else { + workFlowLike.setDelFlag("0"); + workFlow.setLikeNum(workFlow.getLikeNum() + 1); + this.addLikeAdvice(workFlow, userId); + } + // 更新点赞记录 + workFlowLikeMapper.updateStatus(workFlowLike); + // 更新图片点赞数 + workFlowMapper.updateById(workFlow); + return; + } + + // 添加点赞记录 + workFlowLike = new WorkFlowLike(); + workFlowLike.setUserId(userId); + workFlowLike.setWorkFlowId(modelId); + workFlowLikeMapper.insert(workFlowLike); + + // 更新图片点赞数 + workFlow.setLikeNum(workFlow.getLikeNum() + 1); + workFlowMapper.updateById(workFlow); + + this.addLikeAdvice(workFlow, userId); + } + + + private void addLikeAdvice(WorkFlow workFlow, Long userId) { + + Long receiverUserId = workFlow.getUserId(); + + if (Objects.equals(userId, receiverUserId)) { + return; + } + SysUser sysUser = userService.selectUserById(userId); + + String content = StringUtils.format("恭喜!{}点赞了您的工作流:{}", + sysUser.getNickName(), workFlow.getWorkflowName()); + + SysAdvice sysAdvice = new SysAdvice(); + sysAdvice.setSenderId(userId); + sysAdvice.setReceiverId(receiverUserId); + sysAdvice.setContent(content); + sysAdvice.setProductId(workFlow.getId()); + sysAdvice.setProductType(1); + sysAdvice.setIsRead(0); + sysAdvice.setType(AdviceEnum.LIKE_REMIND); + adviceService.save(sysAdvice); + + + } + + +} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelCommentLikeServiceImpl.java b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelCommentLikeServiceImpl.java index d30743b..3411d4b 100644 --- a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelCommentLikeServiceImpl.java +++ b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelCommentLikeServiceImpl.java @@ -1,22 +1,18 @@ package com.mcwl.resource.service.impl; +import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.mcwl.common.exception.ServiceException; import com.mcwl.common.utils.SecurityUtils; -import com.mcwl.common.utils.StringUtils; -import com.mcwl.resource.domain.ModelComment; import com.mcwl.resource.domain.ModelCommentLike; -import com.mcwl.resource.domain.SysAdvice; import com.mcwl.resource.mapper.ModelCommentLikeMapper; -import com.mcwl.resource.mapper.ModelCommentMapper; import com.mcwl.resource.service.ModelCommentLikeService; -import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.resource.util.MqttTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; /** * 模型评论点赞 @@ -25,72 +21,14 @@ import java.util.Objects; public class ModelCommentLikeServiceImpl extends ServiceImpl implements ModelCommentLikeService { @Autowired - private ModelCommentMapper modelCommentMapper; + private MqttTemplate mqttTemplate; @Override - @Transactional public void like(Long commentId) { - ModelComment modelComment = modelCommentMapper.selectById(commentId); - if (Objects.isNull(modelComment)) { - throw new ServiceException("该评论不存在"); - } - Long userId = SecurityUtils.getUserId(); - ModelCommentLike modelCommentLike = baseMapper.getLikeComment(userId, commentId); - if (Objects.nonNull(modelCommentLike)) { - if (Objects.equals(modelCommentLike.getDelFlag(), "0")) { - modelCommentLike.setDelFlag("2"); - int likeNum = modelComment.getLikeNum() - 1; - likeNum = Math.max(likeNum, 0); - modelComment.setLikeNum(likeNum); - } else { - modelCommentLike.setDelFlag("0"); - modelComment.setLikeNum(modelComment.getLikeNum() + 1); - this.addLikeAdvice(modelComment); - } - // 更新点赞记录 - baseMapper.updateDelFlagById(modelCommentLike); - // 更新图片评论点赞数 - modelCommentMapper.updateById(modelComment); - return; - } - - // 添加点赞记录 - modelCommentLike = new ModelCommentLike(); - modelCommentLike.setUserId(userId); - modelCommentLike.setModelCommentId(commentId); - baseMapper.insert(modelCommentLike); - - // 更新模型点赞数 - modelComment.setLikeNum(modelComment.getLikeNum() + 1); - modelCommentMapper.updateById(modelComment); - - this.addLikeAdvice(modelComment); - } - - - private void addLikeAdvice(ModelComment modelComment) { - - Long userId = SecurityUtils.getUserId(); - Long receiverUserId = modelComment.getUserId(); - - if (Objects.equals(userId, receiverUserId)) { - return; - } - - String content = StringUtils.format("恭喜!{}点赞了您的评论", - SecurityUtils.getLoginUser().getUser().getNickName()); - - SysAdvice sysAdvice = new SysAdvice(); - sysAdvice.setSenderId(userId); - sysAdvice.setReceiverId(receiverUserId); - sysAdvice.setContent(content); - sysAdvice.setProductId(modelComment.getModelId()); - sysAdvice.setProductType(0); - sysAdvice.setCommentId(modelComment.getId()); - sysAdvice.setIsRead(0); - sysAdvice.setType(AdviceEnum.LIKE_REMIND); - - + Map map = new HashMap<>(); + map.put("userId", SecurityUtils.getUserId()); + map.put("commentId", commentId); + mqttTemplate.publish("modelCommentLike", JSONUtil.toJsonStr(map), 0); } diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageCommentLikeServiceImpl.java b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageCommentLikeServiceImpl.java index ac80d37..abf1a34 100644 --- a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageCommentLikeServiceImpl.java +++ b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageCommentLikeServiceImpl.java @@ -1,22 +1,19 @@ package com.mcwl.resource.service.impl; +import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.mcwl.common.exception.ServiceException; import com.mcwl.common.utils.SecurityUtils; -import com.mcwl.common.utils.StringUtils; -import com.mcwl.resource.domain.ModelImageComment; import com.mcwl.resource.domain.ModelImageCommentLike; -import com.mcwl.resource.domain.SysAdvice; import com.mcwl.resource.mapper.ModelImageCommentLikeMapper; -import com.mcwl.resource.mapper.ModelImageCommentMapper; import com.mcwl.resource.service.ModelImageCommentLikeService; -import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.resource.util.MqttTemplate; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; /** * 图片评论点赞 @@ -25,72 +22,15 @@ import java.util.Objects; @RequiredArgsConstructor public class ModelImageCommentLikeServiceImpl extends ServiceImpl implements ModelImageCommentLikeService { - private final ModelImageCommentMapper modelImageCommentMapper; + @Autowired + private MqttTemplate mqttTemplate; @Override - @Transactional public void like(Long commentId) { - ModelImageComment modelImageComment = modelImageCommentMapper.selectById(commentId); - if (Objects.isNull(modelImageComment)) { - throw new ServiceException("该评论不存在"); - } - Long userId = SecurityUtils.getUserId(); - ModelImageCommentLike modelImageCommentLike = baseMapper.getLikeImageComment(userId, commentId); - if (Objects.nonNull(modelImageCommentLike)) { - if (Objects.equals(modelImageCommentLike.getDelFlag(), "0")) { - modelImageCommentLike.setDelFlag("2"); - int likeNum = modelImageComment.getLikeNum() - 1; - likeNum = Math.max(likeNum, 0); - modelImageComment.setLikeNum(likeNum); - } else { - modelImageCommentLike.setDelFlag("0"); - modelImageComment.setLikeNum(modelImageComment.getLikeNum() + 1); - this.addLikeAdvice(modelImageComment); - } - // 更新点赞记录 - baseMapper.updateDelFlagById(modelImageCommentLike); - // 更新图片评论点赞数 - modelImageCommentMapper.updateById(modelImageComment); - return; - } - - // 添加点赞记录 - modelImageCommentLike = new ModelImageCommentLike(); - modelImageCommentLike.setUserId(userId); - modelImageCommentLike.setModelImageCommentId(commentId); - baseMapper.insert(modelImageCommentLike); - - // 更新图片点赞数 - modelImageComment.setLikeNum(modelImageComment.getLikeNum() + 1); - modelImageCommentMapper.updateById(modelImageComment); - - this.addLikeAdvice(modelImageComment); - } - - - private void addLikeAdvice(ModelImageComment modelImageComment) { - - Long userId = SecurityUtils.getUserId(); - Long receiverUserId = modelImageComment.getUserId(); - - if (Objects.equals(userId, receiverUserId)) { - return; - } - - String content = StringUtils.format("恭喜!{}点赞了您的评论", - SecurityUtils.getLoginUser().getUser().getNickName()); - - SysAdvice sysAdvice = new SysAdvice(); - sysAdvice.setSenderId(userId); - sysAdvice.setReceiverId(receiverUserId); - sysAdvice.setContent(content); - sysAdvice.setProductId(modelImageComment.getModelImageId()); - sysAdvice.setProductType(2); - sysAdvice.setCommentId(modelImageComment.getId()); - sysAdvice.setIsRead(0); - sysAdvice.setType(AdviceEnum.LIKE_REMIND); - - + Map map = new HashMap<>(); + map.put("userId", SecurityUtils.getUserId()); + map.put("commentId", commentId); + mqttTemplate.publish("imageCommentLike", JSONUtil.toJsonStr(map),0); } diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageLikeServiceImpl.java b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageLikeServiceImpl.java index 2f22683..55b853b 100644 --- a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageLikeServiceImpl.java +++ b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelImageLikeServiceImpl.java @@ -1,6 +1,7 @@ package com.mcwl.resource.service.impl; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.OrderItem; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -11,7 +12,6 @@ import com.mcwl.common.constant.HttpStatus; import com.mcwl.common.core.domain.entity.SysUser; import com.mcwl.common.core.page.PageDomain; import com.mcwl.common.core.page.TableDataInfo; -import com.mcwl.common.exception.ServiceException; import com.mcwl.common.utils.SecurityUtils; import com.mcwl.common.utils.StringUtils; import com.mcwl.resource.domain.ModelImage; @@ -23,16 +23,14 @@ import com.mcwl.resource.domain.vo.PageVo; import com.mcwl.resource.mapper.ModelImageLikeMapper; import com.mcwl.resource.mapper.ModelImageMapper; import com.mcwl.resource.service.ModelImageLikeService; +import com.mcwl.resource.util.MqttTemplate; import com.mcwl.system.domain.enums.AdviceEnum; import com.mcwl.system.service.ISysUserService; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Objects; +import java.util.*; /** * 图片点赞实现 @@ -45,44 +43,15 @@ public class ModelImageLikeServiceImpl extends ServiceImpl map = new HashMap<>(); + map.put("userId", SecurityUtils.getUserId()); + map.put("commentId", imageId); + mqttTemplate.publish("imageLike", JSONUtil.toJsonStr(map),0); } @Override diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelLikeServiceImpl.java b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelLikeServiceImpl.java index 5092fcb..a8a4c7e 100644 --- a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelLikeServiceImpl.java +++ b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/ModelLikeServiceImpl.java @@ -1,6 +1,7 @@ package com.mcwl.resource.service.impl; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.OrderItem; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -9,26 +10,20 @@ import com.mcwl.common.constant.HttpStatus; import com.mcwl.common.core.domain.entity.SysUser; import com.mcwl.common.core.page.PageDomain; import com.mcwl.common.core.page.TableDataInfo; -import com.mcwl.common.exception.ServiceException; import com.mcwl.common.utils.SecurityUtils; import com.mcwl.common.utils.StringUtils; import com.mcwl.resource.domain.ModelLike; import com.mcwl.resource.domain.ModelProduct; -import com.mcwl.resource.domain.SysAdvice; import com.mcwl.resource.domain.vo.ModelLikeVo; import com.mcwl.resource.mapper.ModelLikeMapper; import com.mcwl.resource.mapper.ModelMapper; import com.mcwl.resource.service.ModelLikeService; -import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.resource.util.MqttTemplate; import com.mcwl.system.service.ISysUserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Objects; +import java.util.*; /** * 模型点赞 @@ -43,45 +38,16 @@ public class ModelLikeServiceImpl extends ServiceImpl map = new HashMap<>(); + map.put("userId", SecurityUtils.getUserId()); + map.put("commentId", modelId); + mqttTemplate.publish("modelLike", JSONUtil.toJsonStr(map),0); } @Override @@ -132,31 +98,6 @@ public class ModelLikeServiceImpl extends ServiceImpl initPage(PageDomain pageDomain) { Page page = new Page<>(pageDomain.getPageNum(), pageDomain.getPageSize()); if (StringUtils.isBlank(pageDomain.getOrderByColumn())) { diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowCommentLikeServiceImpl.java b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowCommentLikeServiceImpl.java index d18942d..635a189 100644 --- a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowCommentLikeServiceImpl.java +++ b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowCommentLikeServiceImpl.java @@ -1,22 +1,18 @@ package com.mcwl.resource.service.impl; +import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.mcwl.common.exception.ServiceException; import com.mcwl.common.utils.SecurityUtils; -import com.mcwl.common.utils.StringUtils; -import com.mcwl.resource.domain.SysAdvice; -import com.mcwl.resource.domain.WorkFlowComment; import com.mcwl.resource.domain.WorkFlowCommentLike; import com.mcwl.resource.mapper.WorkFlowCommentLikeMapper; -import com.mcwl.resource.mapper.WorkFlowCommentMapper; import com.mcwl.resource.service.WorkFlowCommentLikeService; -import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.resource.util.MqttTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; /** * @Author:ChenYan @@ -30,72 +26,15 @@ import java.util.Objects; public class WorkFlowCommentLikeServiceImpl extends ServiceImpl implements WorkFlowCommentLikeService { @Autowired - private WorkFlowCommentMapper workFlowCommentMapper; + private MqttTemplate mqttTemplate; @Override - @Transactional public void like(Long commentId) { - WorkFlowComment workFlowComment = workFlowCommentMapper.selectById(commentId); - if (Objects.isNull(workFlowComment)) { - throw new ServiceException("该评论不存在"); - } - Long userId = SecurityUtils.getUserId(); - WorkFlowCommentLike workFlowCommentLike = baseMapper.getLikeComment(userId, commentId); - if (Objects.nonNull(workFlowCommentLike)) { - if (Objects.equals(workFlowCommentLike.getDelFlag(), "0")) { - workFlowCommentLike.setDelFlag("2"); - int likeNum = workFlowComment.getLikeNum() - 1; - likeNum = Math.max(likeNum, 0); - workFlowComment.setLikeNum(likeNum); - } else { - workFlowCommentLike.setDelFlag("0"); - workFlowComment.setLikeNum(workFlowComment.getLikeNum() + 1); - this.addLikeAdvice(workFlowComment); - } - // 更新点赞记录 - baseMapper.updateDelFlagById(workFlowCommentLike); - // 更新图片评论点赞数 - workFlowCommentMapper.updateById(workFlowComment); - return; - } - - // 添加点赞记录 - workFlowCommentLike = new WorkFlowCommentLike(); - workFlowCommentLike.setUserId(userId); - workFlowCommentLike.setWorkFlowCommentId(commentId); - baseMapper.insert(workFlowCommentLike); - - // 更新模型点赞数 - workFlowComment.setLikeNum(workFlowComment.getLikeNum() + 1); - workFlowCommentMapper.updateById(workFlowComment); - - this.addLikeAdvice(workFlowComment); + Map map = new HashMap<>(); + map.put("userId", SecurityUtils.getUserId()); + map.put("commentId", commentId); + mqttTemplate.publish("workFlowCommentLike", JSONUtil.toJsonStr(map),0); } - private void addLikeAdvice(WorkFlowComment workFlowComment) { - - Long userId = SecurityUtils.getUserId(); - Long receiverUserId = workFlowComment.getUserId(); - - if (Objects.equals(userId, receiverUserId)) { - return; - } - - String content = StringUtils.format("恭喜!{}点赞了您的评论", - SecurityUtils.getLoginUser().getUser().getNickName()); - - SysAdvice sysAdvice = new SysAdvice(); - sysAdvice.setSenderId(userId); - sysAdvice.setReceiverId(receiverUserId); - sysAdvice.setContent(content); - sysAdvice.setProductId(workFlowComment.getWorkFlowId()); - sysAdvice.setProductType(1); - sysAdvice.setCommentId(workFlowComment.getId()); - sysAdvice.setIsRead(0); - sysAdvice.setType(AdviceEnum.LIKE_REMIND); - - - } - } diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowLikeServiceImpl.java b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowLikeServiceImpl.java index fc484b7..ac34b98 100644 --- a/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowLikeServiceImpl.java +++ b/mcwl-resource/src/main/java/com/mcwl/resource/service/impl/WorkFlowLikeServiceImpl.java @@ -1,6 +1,7 @@ package com.mcwl.resource.service.impl; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.OrderItem; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -9,25 +10,21 @@ import com.mcwl.common.constant.HttpStatus; import com.mcwl.common.core.domain.entity.SysUser; import com.mcwl.common.core.page.PageDomain; import com.mcwl.common.core.page.TableDataInfo; -import com.mcwl.common.exception.ServiceException; import com.mcwl.common.utils.SecurityUtils; import com.mcwl.common.utils.StringUtils; -import com.mcwl.resource.domain.SysAdvice; import com.mcwl.resource.domain.WorkFlow; import com.mcwl.resource.domain.WorkFlowLike; import com.mcwl.resource.domain.vo.WorkFlowLikeVo; import com.mcwl.resource.mapper.WorkFlowLikeMapper; import com.mcwl.resource.mapper.WorkFlowMapper; import com.mcwl.resource.service.WorkFlowLikeService; -import com.mcwl.system.domain.enums.AdviceEnum; +import com.mcwl.resource.util.MqttTemplate; import com.mcwl.system.service.ISysUserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.*; /** * 工作流点赞 @@ -48,44 +45,15 @@ public class WorkFlowLikeServiceImpl extends ServiceImpl map = new HashMap<>(); + map.put("userId", SecurityUtils.getUserId()); + map.put("commentId", modelId); + mqttTemplate.publish("workFlowLike", JSONUtil.toJsonStr(map),0); } @Override @@ -145,29 +113,7 @@ public class WorkFlowLikeServiceImpl extends ServiceImpl initPage(PageDomain pageDomain) { diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/util/EMQXUtil.java b/mcwl-resource/src/main/java/com/mcwl/resource/util/EMQXUtil.java deleted file mode 100644 index c1a9c57..0000000 --- a/mcwl-resource/src/main/java/com/mcwl/resource/util/EMQXUtil.java +++ /dev/null @@ -1,207 +0,0 @@ -package com.mcwl.resource.util; - -import com.mcwl.resource.domain.*; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static com.mcwl.common.utils.Threads.sleep; - -@Log4j2 -public class EMQXUtil { - - private static final String BROKER_URL = "tcp://192.168.136.128:1883"; - - private static final int connectionTimeout = 30; - private static final int keepAliveInterval = 60; - private static final int MAX_RETRIES = 3; - - // 线程池管理连接线程 - private static final ExecutorService executor = Executors.newFixedThreadPool(3); - - // 客户端存储(替代原来的静态变量) - private static final Map clients = new ConcurrentHashMap<>(); - - // 主题配置(修正后的主题定义) - @Getter - @AllArgsConstructor - public enum TopicConfig { - MODEL_LIKE("modelLikeTopic/1", - "modelLikeTopic/1", - "modelLikeClient_"), - - MODEL_COMMENT_LIKE("modelCommentLikeTopic/1", - "modelCommentLikeTopic/1", - "modelCommentLikeClient_"), - - IMAGE_LIKE("imageLikeTopic/1", - "imageLikeTopic/1", - "imageLikeClient_"), - - IMAGE_COMMENT_LIKE("imageCommentLikeTopic/1", - "imageCommentLikeTopic/1", - "imageCommentLikeClient_"), - - WORKFLOW_LIKE("workFlowLikeTopic/1", - "workFlowLikeTopic/1", - "workFlowLikeClient_"), - - WORKFLOW_COMMENT_LIKE("workFlowCommentLikeTopic/1", - "workFlowCommentLikeTopic/1", - "workFlowCommentLikeClient_"); - - final String subTopic; - final String pubTopic; - final String clientIdPrefix; - } - - - // 获取客户端(线程安全版本) - private static MqttClient getClient(TopicConfig type) { - return clients.computeIfAbsent(type, t -> { - try { - MqttClient client = new MqttClient(BROKER_URL, - generateClientId(t), new MemoryPersistence()); - setupConnection(client, t); - return client; - } catch (MqttException e) { - throw new RuntimeException("客户端初始化失败", e); - } - }); - } - - // 带自动重连的连接设置 - private static void setupConnection(MqttClient client, TopicConfig type) { - MqttConnectOptions opts = new MqttConnectOptions(); - opts.setAutomaticReconnect(true); - opts.setCleanSession(true); - opts.setConnectionTimeout(connectionTimeout); - opts.setKeepAliveInterval(keepAliveInterval); - client.setCallback(createCallback(type)); - - // 异步连接(避免阻塞) - executor.submit(() -> { - int retries = 0; - while (!client.isConnected() && retries < MAX_RETRIES) { - try { - client.connect(opts); - client.subscribe(type.subTopic); - clients.put(type, client); - } catch (MqttException e) { - long delay = (long) Math.min(1000 * Math.pow(2, retries), 30000); - sleep(delay); - retries++; - } - } - - if (retries >= MAX_RETRIES) { - clients.remove(type); - log.error("连接失败超过最大重试次数"); - } - }); - } - - // 回调工厂方法 - private static MqttCallback createCallback(TopicConfig type) { - switch (type) { - case MODEL_LIKE: - return new ModelLikePushCallback(); - case IMAGE_LIKE: - return new ImageLikePushCallback(); - case WORKFLOW_LIKE: - return new WorkFlowLikePushCallback(); - case MODEL_COMMENT_LIKE: - return new ModelCommentLikePushCallback(); - case IMAGE_COMMENT_LIKE: - return new ImageCommentLikePushCallback(); - case WORKFLOW_COMMENT_LIKE: - return new WorkFlowCommentLikePushCallback(); - default: - throw new IllegalArgumentException("未知主题类型: " + type); - } - } - - - // 使用不同的ClientId(增加随机后缀) - private static String generateClientId(TopicConfig config) { - return config.clientIdPrefix + UUID.randomUUID().toString().substring(0, 8); - } - - // 消息发布方法(带连接检查) - public static void sendMessage(TopicConfig type, String message) { - sendMessage(type, type.pubTopic, 2, message); - } - - // 其他发布方法类似,可进一步抽象 - public static void sendMessage(TopicConfig type, String topic, int qos, String msg) { - int retry = 0; - MqttClient client = getClient(type); - send(client, topic, qos, msg, retry); - - } - - public static void sendMessage(MqttClient client, String topic, int qos, String msg) { - int retry = 0; - send(client, topic, qos, msg, retry); - - } - - private static void send(MqttClient client, String topic, int qos, String msg, int retry) { - waitForConnection(client); // 等待最多5秒 - while (retry < 3) { - try { - if (client.isConnected()) { - client.publish(topic, msg.getBytes(), qos, false); - } else { - log.error("客户端未连接!"); - } - break; - } catch (MqttException e) { - retry++; - sleep(1000L * retry); - log.error("发布失败: {}", e.getMessage()); - } - } - } - - private static void waitForConnection(MqttClient client) { - long start = System.currentTimeMillis(); - while (!client.isConnected() && (System.currentTimeMillis() - start) < 5000) { - sleep(100); - } - } - - // 关闭所有连接 - public static void shutdown() { - executor.shutdown(); - try { - if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - clients.forEach((k, v) -> closeClient(v)); - clients.clear(); - } - - private static void closeClient(MqttClient client) { - try { - if (client != null) { - client.disconnect(); - client.close(); - } - } catch (MqttException e) { - log.error("关闭客户端错误: {}", e.getMessage()); - } - } -} \ No newline at end of file diff --git a/mcwl-resource/src/main/java/com/mcwl/resource/util/MqttTemplate.java b/mcwl-resource/src/main/java/com/mcwl/resource/util/MqttTemplate.java new file mode 100644 index 0000000..156f2b1 --- /dev/null +++ b/mcwl-resource/src/main/java/com/mcwl/resource/util/MqttTemplate.java @@ -0,0 +1,245 @@ +package com.mcwl.resource.util; + + +import com.mcwl.common.utils.StringUtils; +import com.mcwl.resource.handler.IMessageHandler; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Async; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.mcwl.common.utils.Threads.sleep; + +@Slf4j +@Getter +@Configuration +@ConfigurationProperties(prefix = "mqtt") +public class MqttTemplate implements MqttCallbackExtended, DisposableBean { + + @Value("${mqtt.broker-url}") + private String brokerUrl; + + @Value("${mqtt.client-id-prefix}") + private String clientIdPrefix; + + @Value("${mqtt.connection-timeout}") + private int connectionTimeout; + + @Value("${mqtt.keep-alive-interval}") + private int keepAliveInterval; + + @Value("${mqtt.max-reconnect-attempts}") + private int maxReconnectAttempts; + + @Value("${mqtt.clean-session}") + private boolean cleanSession; + + private String clientId; + + //================= 运行时组件 =================// + private MqttAsyncClient client; + private final Map> topicHandlers = new ConcurrentHashMap<>(); + private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); + private volatile boolean isConnecting; + private final ApplicationContext context; + + public MqttTemplate(ApplicationContext context) { + this.context = context; + } + + //================= 生命周期管理 =================// + @PostConstruct + public void init() throws MqttException { + if (StringUtils.isBlank(clientId)) { + clientId = clientIdPrefix + UUID.randomUUID().toString().substring(0, 8); + } + + client = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence()); + client.setCallback(this); + connect(); +// autoRegisterHandlers(); + } + + @Override + public void destroy() throws Exception { + if (client != null && client.isConnected()) { + client.disconnect(); + } + reconnectExecutor.shutdown(); + } + + /** + * 连接管理 + * + * @throws MqttException 连接失败 + */ + private void connect() throws MqttException { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(cleanSession); + options.setConnectionTimeout(connectionTimeout); + options.setKeepAliveInterval(keepAliveInterval); + options.setAutomaticReconnect(false); // 手动控制重连 + + client.connect(options, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + log.info("MQTT连接成功"); + resubscribeAllTopics(); + autoRegisterHandlers(); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + log.error("初始连接失败", exception); + scheduleReconnect(1); + } + }); + } + + //================= 自动重连机制 =================// + private void scheduleReconnect(int attempt) { + if (attempt > maxReconnectAttempts) { + log.error("达到最大重连次数: {}", maxReconnectAttempts); + return; + } + + long delay = (long) Math.min(1000 * Math.pow(2, attempt), 30000); + reconnectExecutor.schedule(() -> { + try { + log.info("尝试第{}次重连", attempt); + if (client != null && client.isConnected()) { + client.disconnectForcibly(); // 强制断开旧连接 + client.close(true); + } + // 重新初始化客户端 + client = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence()); + client.setCallback(this); // 关键:重新绑定回调 + client.reconnect(); + resubscribeAllTopics(); // 重连后立即订阅 + } catch (MqttException e) { + log.error("重连失败", e); + scheduleReconnect(attempt + 1); + } + }, delay, TimeUnit.MILLISECONDS); + } + + //================= 消息发布 =================// + @Async + public void publish(String topic, String payload, int qos) { + try { + if (!client.isConnected()) { + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); + } + client.publish(topic, payload.getBytes(), qos, false); + } catch (MqttException e) { + log.error("消息发送失败 [topic: {}]", topic, e); + } + } + + @Async + public void publish(String topic, String payload) { + try { + if (!client.isConnected()) { + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); + } + client.publish(topic, payload.getBytes(), 1, false); + } catch (MqttException e) { + log.error("消息发送失败 [topic: {}]", topic, e); + } + } + + //================= 订阅管理 =================// + public void subscribe(String topicFilter, int qos, IMessageHandler handler) { + try { + if (!client.isConnected()) { + log.warn("客户端未连接,延迟订阅 [topic: {}]", topicFilter); + return; + } + client.subscribe(topicFilter, qos); + topicHandlers.computeIfAbsent(topicFilter, k -> new ArrayList<>()) + .add(handler); + log.info("订阅成功: {}", topicFilter); + } catch (MqttException e) { + log.error("订阅失败 [topic: {}]", topicFilter); + e.printStackTrace(); + } + } + + private void resubscribeAllTopics() { + topicHandlers.forEach((topic, handlers) -> { + int retry = 0; + while (retry < 3) { + try { + client.subscribe(topic, 1); + log.info("主题重新订阅成功: {}", topic); + break; + } catch (MqttException e) { + retry++; + log.error("重新订阅失败 [topic: {}], 重试 {}/3", topic, retry); + sleep(1000L * retry); + } + } + }); + } + + //================= 回调处理 =================// + @Override + public void messageArrived(String topic, MqttMessage message) { + + topicHandlers.entrySet().stream() + .filter(entry -> MqttTopic.isMatched(entry.getKey(), topic)) + .flatMap(entry -> entry.getValue().stream()) + .forEach(handler -> { + try { + handler.handleMessage(topic, message); + } catch (Exception e) { + log.error("消息处理异常 [handler: {}]", handler.getClass(), e); + } + }); + } + + @Override + public void connectionLost(Throwable cause) { + log.error("连接丢失", cause); + scheduleReconnect(1); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // QoS处理逻辑(可选) + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("连接已建立 [reconnect: {}]", reconnect); + if (reconnect) { + resubscribeAllTopics(); + autoRegisterHandlers(); + } + } + + //================= 自动注册处理器 =================// + private void autoRegisterHandlers() { + context.getBeansOfType(IMessageHandler.class).values() + .forEach(handler -> { + handler.getTopics().forEach(topic -> + subscribe(topic.getTopicFilter(), topic.getQos(), handler)); + }); + } +} \ No newline at end of file