diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/OnLineVehicleController.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/OnLineVehicleController.java index 49f25eb..9789155 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/OnLineVehicleController.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/OnLineVehicleController.java @@ -3,14 +3,8 @@ package com.couplet.business.server.controller; import com.couplet.business.server.service.OnLineVehicleService; import com.couplet.common.core.domain.Result; import com.couplet.common.domain.Vehicle; -import com.couplet.common.domain.request.RealTimeDataRequest; -import com.couplet.common.security.utils.SecurityUtils; -import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.amqp.RabbitTemplateConfigurer; -import org.springframework.data.redis.core.ReactiveRedisOperations; import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -28,8 +22,7 @@ public class OnLineVehicleController { @Autowired private OnLineVehicleService onLineVehicleService; - @Autowired - private RabbitTemplate rabbitTemplate; + @PostMapping("onlineList") public Result> onlineList(){ List onlineList = onLineVehicleService.onlineList(); @@ -37,14 +30,8 @@ public class OnLineVehicleController { return success; } - @PostMapping("/monitorinData") - public void monitorinData(@RequestBody RealTimeDataRequest realTimeDataRequest){ - String exchangeName = "exchangeName"; // 交换机名称 - String routingKey = "routingKey"; // 路由键 - Long userId = SecurityUtils.getUserId(); - realTimeDataRequest.setUserId(userId); - rabbitTemplate.convertAndSend(exchangeName, routingKey, realTimeDataRequest); - } + + } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/SysTroubleController.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/SysTroubleController.java index a4a624a..67c508b 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/SysTroubleController.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/controller/SysTroubleController.java @@ -9,7 +9,6 @@ import com.couplet.common.domain.CoupletTroubleGrade; import com.couplet.common.domain.request.TroubleResp; import com.couplet.common.log.annotation.Log; import com.couplet.common.log.enums.BusinessType; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; @@ -24,7 +23,6 @@ import java.util.List; */ @RestController @RequestMapping("trouble") -@Slf4j public class SysTroubleController extends BaseController { @Autowired private SysTroubleService troubleService; @@ -38,6 +36,14 @@ public class SysTroubleController extends BaseController { return Result.success(result); } + /** + * 故障类型信息 + */ + @GetMapping("/troubleTypeList") + public List listType() { + return troubleService.selectTroubleListByType(); + } + /** * 故障等级信息 */ @@ -75,27 +81,12 @@ public class SysTroubleController extends BaseController { } /** - * 故障数据入库 - * @param coupletTroubleLog - * @return + * 删除故障码数据 */ - @PostMapping("insertCode") - public Result insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode){ - long start = System.currentTimeMillis(); - int i = troubleService.insertMsgResq(coupletTroubleCode); - - long end = System.currentTimeMillis(); - log.info("记录异常信息成功,耗时:{}",(end-start)); - - return Result.success(i); - } - - /** - * 清空故障码信息 - */ - @PostMapping("cleanTroubleCode") - public Result cleanTroubleCode(){ - troubleService.cleanTroubleCode(); - return Result.success(); + @Log(title = "新增故障码",businessType = BusinessType.INSERT) + @PostMapping("/newFaultData") + public Result newFaultData(@RequestBody CoupletTroubleCode code) { + troubleService.newFaultData(code); + return success(); } } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java index 12d83b4..2f3a903 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/mapper/VehicleMapper.java @@ -4,7 +4,6 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.couplet.common.domain.Vehicle; import com.couplet.common.domain.VehicleMiddle; import org.apache.ibatis.annotations.Mapper; -import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Component; import java.util.List; @@ -25,9 +24,4 @@ public interface VehicleMapper extends BaseMapper { Integer addVehicle(VehicleMiddle vehicleMiddle); List vehicleAll(); - - Integer onOrOutLineByVIN(@Param("vin") String vin, @Param("status") Integer status); - - Integer addVehicle(@Param("userId") Long userId, @Param("vehicleIds") List vehicleIds); - } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/LogoService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/LogoService.java index c34afba..7571a35 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/LogoService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/LogoService.java @@ -3,7 +3,6 @@ package com.couplet.business.server.service; import com.baomidou.mybatisplus.extension.service.IService; import com.couplet.common.domain.Logo; - import java.util.List; /** diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/SysTroubleService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/SysTroubleService.java index 5333f87..e280b7c 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/SysTroubleService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/SysTroubleService.java @@ -4,9 +4,9 @@ import com.baomidou.mybatisplus.extension.service.IService; import com.couplet.common.core.domain.PageResult; import com.couplet.common.domain.CoupletTroubleCode; import com.couplet.common.domain.CoupletTroubleGrade; +import com.couplet.common.domain.CoupletTroubleType; import com.couplet.common.domain.request.TroubleResp; - import java.util.List; /** @@ -18,9 +18,18 @@ import java.util.List; public interface SysTroubleService extends IService { PageResult selectTroubleList(TroubleResp troubleReq); + List selectTroubleListByType(); + List selectTroubleListByGrade(); - int insertMsgResq(CoupletTroubleCode coupletTroubleCode); + /** + * 新增故障数据 + * @param code + */ + void newFaultData(CoupletTroubleCode code); - void cleanTroubleCode(); + +// int addTrouble (TroubleAddReq troubleAddReq); + +// int updateTrouble(TroubleUpdReq troubleUpdReq); } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleAndLogoService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleAndLogoService.java index 306bb6e..5a49bf1 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleAndLogoService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleAndLogoService.java @@ -3,7 +3,6 @@ package com.couplet.business.server.service; import com.baomidou.mybatisplus.extension.service.IService; import com.couplet.common.domain.VehicleAndLogo; - import java.util.List; /** diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleDetectionService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleDetectionService.java index 2da84f6..3145db7 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleDetectionService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleDetectionService.java @@ -1,6 +1,5 @@ package com.couplet.business.server.service; -import com.couplet.common.core.domain.Result; import com.couplet.common.domain.Vehicle; import java.util.List; @@ -9,5 +8,5 @@ public interface VehicleDetectionService { List detectionList(); - List findByVin(Integer vehicleId); + List findByVin(String vehicleId); } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java index f692152..954fe8b 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleService.java @@ -37,6 +37,4 @@ public interface VehicleService extends IService { List vehicleAll(); - Integer onOrOutLineByVIN(String vin,Integer status); - } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleTypeService.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleTypeService.java index 52971f5..ff20839 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleTypeService.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/VehicleTypeService.java @@ -3,7 +3,6 @@ package com.couplet.business.server.service; import com.baomidou.mybatisplus.extension.service.IService; import com.couplet.common.domain.VehicleType; - import java.util.List; /** diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java index a81e11b..c7d3faf 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/FenceServiceImpl.java @@ -1,5 +1,6 @@ package com.couplet.business.server.service.impl; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.couplet.business.server.mapper.FenceMapper; import com.couplet.business.server.service.FenAndLogoService; @@ -8,12 +9,15 @@ import com.couplet.common.domain.Fence; import com.couplet.common.domain.request.FenceConfig; import com.couplet.common.domain.request.FenceRequest; import com.couplet.common.domain.request.FenceUpdateRequest; +import com.couplet.common.security.utils.SecurityUtils; +import com.couplet.mq.remote.RemoteFenceService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import javax.servlet.http.HttpServletRequest; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @Author: LiJiaYao @@ -93,6 +97,11 @@ public class FenceServiceImpl extends ServiceImpl implements @Override public void removeByFenceId(Long fenceId) { fenceMapper.removeByFenceId(fenceId); + /** + * 电子围栏发送改变 + */ + redisTemplate.opsForValue().set("removeByFenceId", JSON.toJSONString(fenceId), 10, TimeUnit.MINUTES); + } @Override diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/ManageServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/ManageServiceImpl.java index 2c0281d..2ea6f8b 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/ManageServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/ManageServiceImpl.java @@ -7,7 +7,6 @@ import com.couplet.common.system.domain.LoginUser; import com.couplet.common.system.domain.SysDept; import com.couplet.common.system.remote.RemoteDeptService; import com.couplet.common.system.remote.RemoteUserService; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/SysTroubleServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/SysTroubleServiceImpl.java index 58edabe..8cc311b 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/SysTroubleServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/SysTroubleServiceImpl.java @@ -6,6 +6,7 @@ import com.couplet.business.server.service.SysTroubleService; import com.couplet.common.core.domain.PageResult; import com.couplet.common.domain.CoupletTroubleCode; import com.couplet.common.domain.CoupletTroubleGrade; +import com.couplet.common.domain.CoupletTroubleType; import com.couplet.common.domain.request.TroubleResp; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -39,26 +40,40 @@ public class SysTroubleServiceImpl extends ServiceImpl selectTroubleListByType() { + return sysTroubleMapper.selectTroubleListByType(); + } + @Override public List selectTroubleListByGrade() { return sysTroubleMapper.selectTroubleListByGrade(); } - /** - * 添加异常信息 - * @param coupletTroubleLog - * @return - */ @Override - public int insertMsgResq(CoupletTroubleCode coupletTroubleCode) { - return sysTroubleMapper.insertMsgResq(coupletTroubleCode); + public void newFaultData(CoupletTroubleCode code) { + sysTroubleMapper.newFaultData(code); } /** - * 清理故障异常信息 + * 新增故障码数据 +// * @param troubleAddReq + * @return */ - @Override - public void cleanTroubleCode() { - sysTroubleMapper.cleanTroubleCode(); - } +// @Override +// public int addTrouble(TroubleAddReq troubleAddReq) { +// return sysTroubleMapper.addTrouble(troubleAddReq); +// } +// +// /** +// * 修改故障码数据 +// * @param troubleUpdReq +// * @return +// */ +// @Override +// public int updateTrouble(TroubleUpdReq troubleUpdReq) { +// return sysTroubleMapper.updateTrouble(troubleUpdReq); +// } + + } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleDetectionServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleDetectionServiceImpl.java index d0d29ce..078780f 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleDetectionServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleDetectionServiceImpl.java @@ -2,8 +2,6 @@ package com.couplet.business.server.service.impl; import com.couplet.business.server.mapper.VehicleDetectionMapper; import com.couplet.business.server.service.VehicleDetectionService; -import com.couplet.business.server.service.VehicleManageService; -import com.couplet.common.core.domain.Result; import com.couplet.common.domain.Vehicle; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -28,7 +26,7 @@ public class VehicleDetectionServiceImpl implements VehicleDetectionService{ } @Override - public List findByVin(Integer vehicleId) { + public List findByVin(String vehicleId) { return vehicleDetectionMapper.findByVin(vehicleId); } } diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleManageServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleManageServiceImpl.java index c50ef02..fa3c412 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleManageServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleManageServiceImpl.java @@ -61,6 +61,8 @@ public class VehicleManageServiceImpl implements VehicleManageService { */ @Override public Result addVehicle(VehicleMiddle middle) { + Long userId = SecurityUtils.getUserId(); + middle.setUserId(userId); Result integerResult = remoteVehicleService.addVehicle(middle); Integer resultData = integerResult.getData(); return resultData==1?Result.success():Result.error("添加失败"); diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java index 0c61570..1ee5a95 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/service/impl/VehicleServiceImpl.java @@ -16,7 +16,6 @@ import com.couplet.common.domain.VehicleType; import com.couplet.common.domain.request.VehicleEditParams; import com.couplet.common.domain.request.VehicleInsertParams; import com.couplet.common.domain.request.VehicleListParams; -import com.couplet.common.security.utils.SecurityUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -98,7 +97,7 @@ public class VehicleServiceImpl extends ServiceImpl impl **/ @Override public String deleteById(Long vehicleId) { - String result; + String result = ""; UpdateWrapper updateWrapper = new UpdateWrapper<>(); @@ -126,7 +125,7 @@ public class VehicleServiceImpl extends ServiceImpl impl **/ @Override public String editById(VehicleEditParams editParams) { - String result; + String result = ""; if ((editParams.getLogoIds() == null || editParams.getLogoIds().isEmpty())) { result = "未选择电子围栏"; @@ -181,7 +180,7 @@ public class VehicleServiceImpl extends ServiceImpl impl **/ @Override public String insert(VehicleInsertParams insertParams) { - String result; + String result = ""; if ((insertParams.getLogoIds() == null || insertParams.getLogoIds().isEmpty())) { result = "未选择电子围栏"; @@ -250,13 +249,14 @@ public class VehicleServiceImpl extends ServiceImpl impl **/ @Override public List getBindLogoById(Long vehicleId) { + List logoIds = vehicleAndLogoService.getBindLogoById(vehicleId); - return vehicleAndLogoService.getBindLogoById(vehicleId); + return logoIds; } /* * @param userId: - * @return List + * @return List * @author 付凡芮 * @description 根据登入人id查询管理车辆 * @@ -273,8 +273,8 @@ public class VehicleServiceImpl extends ServiceImpl impl @Override public Integer addVehicle(VehicleMiddle vehicleMiddle) { - Long userId = SecurityUtils.getUserId(); - return vehicleMapper.addVehicle(userId,vehicleMiddle.getVehicleIds()); + + return vehicleMapper.addVehicle(vehicleMiddle); } @Override @@ -282,14 +282,6 @@ public class VehicleServiceImpl extends ServiceImpl impl return vehicleMapper.vehicleAll(); } - - //通过vin修改车辆上下线的状态 - @Override - public Integer onOrOutLineByVIN(String vin, Integer status) { - - return vehicleMapper.onOrOutLineByVIN(vin, status); - } - @Override public List findByVIN(String vin) { diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/time/Timer.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java similarity index 97% rename from couplet-modules/couplet-business/src/main/java/com/couplet/business/time/Timer.java rename to couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java index 1ed0c20..fb955d4 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/time/Timer.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java @@ -1,4 +1,4 @@ -package com.couplet.business.time; +package com.couplet.business.server.time; import com.couplet.business.server.service.VehicleService; import com.couplet.common.domain.Vehicle; diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/CoupletMqApplatcaion.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/CoupletMqApplatcaion.java index f013e29..d1c309f 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/CoupletMqApplatcaion.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/CoupletMqApplatcaion.java @@ -19,6 +19,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class CoupletMqApplatcaion { public static void main(String[] args) { SpringApplication.run(CoupletMqApplatcaion.class, args); - System.out.println("MQ模块启动成功"); + System.out.println("获取报文、RabbitMQ模块启动成功"); } } diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java index 853a559..3204c44 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/config/RabbitMQConfig.java @@ -25,16 +25,22 @@ import org.springframework.context.annotation.Primary; public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { // 通过注入的方式获取队列名、交换机名和路由键 //队列名 -// @Value("${mq.queueName}") - public static final String queueName = "queueName"; + @Value("queueName") + public String queueName; + + //队列名 + public static final String FENCE_QUEUE ="fenceQueue"; //交换机 -// @Value("${mq.exchangeName}") - public static final String exchangeName = "exchangeName"; + @Value("exchangeName") + public String exchangeName; + public static final String FENCE_EXCHANGE="fenceExchange"; //路由键 -// @Value("${mq.routingKey}") - public static final String routingKey = "routingKey"; + @Value("routingKey") + public String routingKey; + //路由键 + public static final String FENCE_ROUTINGKEY="fenceRoutingKey"; private RabbitTemplate rabbitTemplate; @@ -61,6 +67,10 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem public Queue queue() { return new Queue(queueName, true); } + @Bean("fenceQueue") + public Queue queue2() { + return new Queue(FENCE_QUEUE, true); + } /* * @Author: LiuYunHu @@ -74,6 +84,11 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem return new DirectExchange(exchangeName); } + @Bean("fenceExchange") + public DirectExchange directExchange2() { + return new DirectExchange(FENCE_EXCHANGE); + } + /* * @Author: LiuYunHu * @Date: 2024/3/29 21:27 @@ -116,6 +131,11 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem return BindingBuilder.bind(queue()).to(directExchange()).with(routingKey); } + @Bean("fenceRoutingKey") + public Binding binding2() { + return BindingBuilder.bind(queue2()).to(directExchange2()).with(FENCE_ROUTINGKEY); + } + /* * @Author: LiuYunHu * @Date: 2024/3/29 21:28 diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java index a0ccd6b..4a81700 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/MqController.java @@ -1,15 +1,20 @@ package com.couplet.mq.controller; import com.couplet.common.core.utils.uuid.IdUtils; +import com.couplet.common.domain.request.FenceUpdateRequest; import com.couplet.common.domain.request.RealTimeDataRequest; import com.couplet.common.security.utils.SecurityUtils; +import com.couplet.mq.config.RabbitMQConfig; import com.couplet.mq.domain.User; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** * @ProjectName: five-groups-couplet @@ -23,17 +28,17 @@ import org.springframework.web.bind.annotation.*; public class MqController { // 通过注入的方式获取队列名、交换机名和路由键 //队列名 - @Value("${mq.queueName}") - public static final String queueName="queueName"; - @Value("${mq.finByVinQueueName}") - public static final String finByVinQueueName="finByVinQueueName"; + @Value("queueName") + public String queueName; + @Value("finByVinQueueName") + public String finByVinQueueName; //交换机 - @Value("${mq.exchangeName}") - public static final String exchangeName="exchangeName"; + @Value("exchangeName") + public String exchangeName; //路由键 - @Value("${mq.routingKey}") - public static final String routingKey="routingKey"; + @Value("routingKey") + public String routingKey; @Autowired private RabbitTemplate rabbitTemplate; @@ -57,13 +62,19 @@ public class MqController { } - @PostMapping("findByVin/{vin}") - public void postFindByVin(@PathVariable String vin){ - RealTimeDataRequest realTimeDataRequest = new RealTimeDataRequest(); + @PostMapping("findByVin") + public void postFindByVin(@RequestBody RealTimeDataRequest request){ Long userId = SecurityUtils.getUserId(); - realTimeDataRequest.setUserId(userId); - realTimeDataRequest.setVin(vin); - rabbitTemplate.convertAndSend(exchangeName, routingKey, realTimeDataRequest, message -> { + request.setUserId(userId); + rabbitTemplate.convertAndSend(exchangeName, routingKey, request, message -> { + message.getMessageProperties().setMessageId(IdUtils.randomUUID()); + return message; + }, new CorrelationData(IdUtils.randomUUID()) + ); + } + @PostMapping("fenceQueue") + public void fenceQueue(@RequestBody FenceUpdateRequest fenceUpdateRequest){ + rabbitTemplate.convertAndSend(RabbitMQConfig.FENCE_EXCHANGE, RabbitMQConfig.FENCE_ROUTINGKEY, fenceUpdateRequest, message -> { message.getMessageProperties().setMessageId(IdUtils.randomUUID()); return message; }, new CorrelationData(IdUtils.randomUUID()) diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/remote/RemoteFenceService.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/remote/RemoteFenceService.java new file mode 100644 index 0000000..1225961 --- /dev/null +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/remote/RemoteFenceService.java @@ -0,0 +1,19 @@ +package com.couplet.mq.remote; + +import com.couplet.common.core.constant.ServiceNameConstants; +import com.couplet.common.domain.request.FenceUpdateRequest; +import com.couplet.mq.remote.factory.RemoteFenceFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +@FeignClient(contextId = "remoteVehicleService" , + value = ServiceNameConstants.BUSINESS_SERVICE, + fallbackFactory = RemoteFenceFallbackFactory.class +) +public interface RemoteFenceService { + + @PostMapping("/mq/fenceQueue") + public void fenceQueue(@RequestBody FenceUpdateRequest fenceUpdateRequest); + +} diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/remote/factory/RemoteFenceFallbackFactory.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/remote/factory/RemoteFenceFallbackFactory.java new file mode 100644 index 0000000..5221a10 --- /dev/null +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/remote/factory/RemoteFenceFallbackFactory.java @@ -0,0 +1,33 @@ +package com.couplet.mq.remote.factory; + +import com.couplet.common.domain.request.FenceUpdateRequest; +import com.couplet.mq.remote.RemoteFenceService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.openfeign.FallbackFactory; +import org.springframework.stereotype.Component; + +import static com.couplet.common.core.domain.Result.error; + +/** + * @author fufanrui + * @version 1.0 + * @description: TODO + * @date 2024/4/2 14:46 + */ +@Component +public class RemoteFenceFallbackFactory implements FallbackFactory { + private static final Logger log = LoggerFactory.getLogger(RemoteFenceFallbackFactory.class); + + + @Override + public RemoteFenceService create(Throwable cause) { + return new RemoteFenceService() { + + @Override + public void fenceQueue(FenceUpdateRequest fenceUpdateRequest) { + error("调用失败...."+cause.getMessage()); + } + }; + } +} diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/Consumer.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/Consumer.java new file mode 100644 index 0000000..fc83c53 --- /dev/null +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/service/Consumer.java @@ -0,0 +1,164 @@ +package com.couplet.mq.service; + +import com.couplet.mq.domain.User; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * @ProjectName: five-groups-couplet + * @Author: LiuYunHu + * @CreateTime: 2024/3/28 + * @Description: MQ消费者类 + */ + +@Component +@Slf4j +@SuppressWarnings("all") +@RabbitListener(queues = "queueName") +public class Consumer { + @Autowired + private StringRedisTemplate redis; + + /* 线程池执行 + + //创建一个定长线程池 + private final Executor executor = Executors.newFixedThreadPool(5); + + @Async + @RabbitHandler + public void process(User param, Channel channel, Message message) { + executor.execute(() -> { + try { + handleMessage(param, channel, message); + } catch (IOException e) { + log.error("处理消息失败:{}", e); + } + }); + } + + //处理信息的方法 + private void handleMessage(User param, Channel channel, Message message) throws IOException { + log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); + + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + String messageId = message.getMessageProperties().getMessageId(); + + if (!redis.hasKey("value:" + messageId)) { + redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); + } + + // 1 添加成功新数据 0已有重复值,不允许再添加 + Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); + //过期时间 + redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); + + + try { + if (add == 1) { + //第一次 消费 + System.out.println("*****************************"); + System.out.println("消费者收到消息:" + param); + System.out.println("*****************************"); + log.info("消费结束"); + + channel.basicAck(deliveryTag, false); + + } else { + //重复消费 + log.error("重复消费"); + channel.basicReject(deliveryTag, false); + + //删除缓存 + redis.opsForSet().remove("set:" + messageId, "set:" + messageId); + } + + + } catch (Exception e) { + log.error("消息没有成功消费!"); + + String s = redis.opsForValue().get("value:" + messageId); + + long oldTag = Long.parseLong(s); + + if (deliveryTag == (oldTag + 2)) { + log.error("确实消费不了,不入队了!"); + channel.basicNack(deliveryTag, false, false); + } else { + log.info("消息消费失败,重新入队"); + channel.basicNack(deliveryTag, false, true); + } + } + + } + +**/ + + @RabbitHandler + public void process(User param, Channel channel, Message message) throws IOException { + log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag()); + + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + String messageId = message.getMessageProperties().getMessageId(); + + if (!redis.hasKey("value:" + messageId)) { + redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES); + } + + // 1 添加成功新数据 0已有重复值,不允许再添加 + Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId); + //过期时间 + redis.expire("set:" + messageId, 5, TimeUnit.MINUTES); + + + try { + if (add == 1) { + //第一次 消费 + System.out.println("*****************************"); + System.out.println("消费者收到消息:" + param); + System.out.println("*****************************"); + log.info("消费结束"); + + //确认消费 + channel.basicAck(deliveryTag, false); + + } else { + //重复消费 + log.error("重复消费"); + //拒绝消费 + channel.basicReject(deliveryTag, false); + + //删除缓存 + redis.opsForSet().remove("set:" + messageId, "set:" + messageId); + } + + + } catch (Exception e) { + log.error("消息没有成功消费!"); + + String s = redis.opsForValue().get("value:" + messageId); + + long oldTag = Long.parseLong(s); + + if (deliveryTag == (oldTag + 2)) { + log.error("确实消费不了,不入队了!"); + + + //拒绝消费 + channel.basicNack(deliveryTag, false, false); + } else { + log.info("消息消费失败,重新入队"); + //重新入队 + channel.basicNack(deliveryTag, false, true); + } + } + } +}