fix: 代码丢失找回

server_five_liuyunhu
lijiayao 2024-04-06 10:24:27 +08:00
parent d2a29b0016
commit e5761fb11f
22 changed files with 347 additions and 110 deletions

View File

@ -3,14 +3,8 @@ package com.couplet.business.server.controller;
import com.couplet.business.server.service.OnLineVehicleService;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.Vehicle;
import com.couplet.common.domain.request.RealTimeDataRequest;
import com.couplet.common.security.utils.SecurityUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitTemplateConfigurer;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@ -28,8 +22,7 @@ public class OnLineVehicleController {
@Autowired
private OnLineVehicleService onLineVehicleService;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("onlineList")
public Result<List<Vehicle>> onlineList(){
List<Vehicle> onlineList = onLineVehicleService.onlineList();
@ -37,14 +30,8 @@ public class OnLineVehicleController {
return success;
}
@PostMapping("/monitorinData")
public void monitorinData(@RequestBody RealTimeDataRequest realTimeDataRequest){
String exchangeName = "exchangeName"; // 交换机名称
String routingKey = "routingKey"; // 路由键
Long userId = SecurityUtils.getUserId();
realTimeDataRequest.setUserId(userId);
rabbitTemplate.convertAndSend(exchangeName, routingKey, realTimeDataRequest);
}
}

View File

@ -9,7 +9,6 @@ import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.request.TroubleResp;
import com.couplet.common.log.annotation.Log;
import com.couplet.common.log.enums.BusinessType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@ -24,7 +23,6 @@ import java.util.List;
*/
@RestController
@RequestMapping("trouble")
@Slf4j
public class SysTroubleController extends BaseController {
@Autowired
private SysTroubleService troubleService;
@ -38,6 +36,14 @@ public class SysTroubleController extends BaseController {
return Result.success(result);
}
/**
*
*/
@GetMapping("/troubleTypeList")
public List<CoupletTroubleType> listType() {
return troubleService.selectTroubleListByType();
}
/**
*
*/
@ -75,27 +81,12 @@ public class SysTroubleController extends BaseController {
}
/**
*
* @param coupletTroubleLog
* @return
*
*/
@PostMapping("insertCode")
public Result<Integer> insertCode(@RequestBody CoupletTroubleCode coupletTroubleCode){
long start = System.currentTimeMillis();
int i = troubleService.insertMsgResq(coupletTroubleCode);
long end = System.currentTimeMillis();
log.info("记录异常信息成功,耗时:{}",(end-start));
return Result.success(i);
}
/**
*
*/
@PostMapping("cleanTroubleCode")
public Result<?> cleanTroubleCode(){
troubleService.cleanTroubleCode();
return Result.success();
@Log(title = "新增故障码",businessType = BusinessType.INSERT)
@PostMapping("/newFaultData")
public Result<?> newFaultData(@RequestBody CoupletTroubleCode code) {
troubleService.newFaultData(code);
return success();
}
}

View File

@ -4,7 +4,6 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.couplet.common.domain.Vehicle;
import com.couplet.common.domain.VehicleMiddle;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.List;
@ -25,9 +24,4 @@ public interface VehicleMapper extends BaseMapper<Vehicle> {
Integer addVehicle(VehicleMiddle vehicleMiddle);
List<Vehicle> vehicleAll();
Integer onOrOutLineByVIN(@Param("vin") String vin, @Param("status") Integer status);
Integer addVehicle(@Param("userId") Long userId, @Param("vehicleIds") List<Long> vehicleIds);
}

View File

@ -3,7 +3,6 @@ package com.couplet.business.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.couplet.common.domain.Logo;
import java.util.List;
/**

View File

@ -4,9 +4,9 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.couplet.common.core.domain.PageResult;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.CoupletTroubleType;
import com.couplet.common.domain.request.TroubleResp;
import java.util.List;
/**
@ -18,9 +18,18 @@ import java.util.List;
public interface SysTroubleService extends IService<CoupletTroubleCode> {
PageResult<CoupletTroubleCode> selectTroubleList(TroubleResp troubleReq);
List<CoupletTroubleType> selectTroubleListByType();
List<CoupletTroubleGrade> selectTroubleListByGrade();
int insertMsgResq(CoupletTroubleCode coupletTroubleCode);
/**
*
* @param code
*/
void newFaultData(CoupletTroubleCode code);
void cleanTroubleCode();
// int addTrouble (TroubleAddReq troubleAddReq);
// int updateTrouble(TroubleUpdReq troubleUpdReq);
}

View File

@ -3,7 +3,6 @@ package com.couplet.business.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.couplet.common.domain.VehicleAndLogo;
import java.util.List;
/**

View File

@ -1,6 +1,5 @@
package com.couplet.business.server.service;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.Vehicle;
import java.util.List;
@ -9,5 +8,5 @@ public interface VehicleDetectionService {
List<Vehicle> detectionList();
List<Vehicle> findByVin(Integer vehicleId);
List<Vehicle> findByVin(String vehicleId);
}

View File

@ -37,6 +37,4 @@ public interface VehicleService extends IService<Vehicle> {
List<Vehicle> vehicleAll();
Integer onOrOutLineByVIN(String vin,Integer status);
}

View File

@ -3,7 +3,6 @@ package com.couplet.business.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.couplet.common.domain.VehicleType;
import java.util.List;
/**

View File

@ -1,5 +1,6 @@
package com.couplet.business.server.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.couplet.business.server.mapper.FenceMapper;
import com.couplet.business.server.service.FenAndLogoService;
@ -8,12 +9,15 @@ import com.couplet.common.domain.Fence;
import com.couplet.common.domain.request.FenceConfig;
import com.couplet.common.domain.request.FenceRequest;
import com.couplet.common.domain.request.FenceUpdateRequest;
import com.couplet.common.security.utils.SecurityUtils;
import com.couplet.mq.remote.RemoteFenceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Author: LiJiaYao
@ -93,6 +97,11 @@ public class FenceServiceImpl extends ServiceImpl<FenceMapper, Fence> implements
@Override
public void removeByFenceId(Long fenceId) {
fenceMapper.removeByFenceId(fenceId);
/**
*
*/
redisTemplate.opsForValue().set("removeByFenceId", JSON.toJSONString(fenceId), 10, TimeUnit.MINUTES);
}
@Override

View File

@ -7,7 +7,6 @@ import com.couplet.common.system.domain.LoginUser;
import com.couplet.common.system.domain.SysDept;
import com.couplet.common.system.remote.RemoteDeptService;
import com.couplet.common.system.remote.RemoteUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

View File

@ -6,6 +6,7 @@ import com.couplet.business.server.service.SysTroubleService;
import com.couplet.common.core.domain.PageResult;
import com.couplet.common.domain.CoupletTroubleCode;
import com.couplet.common.domain.CoupletTroubleGrade;
import com.couplet.common.domain.CoupletTroubleType;
import com.couplet.common.domain.request.TroubleResp;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@ -39,26 +40,40 @@ public class SysTroubleServiceImpl extends ServiceImpl<SysTroubleMapper, Couplet
return PageResult.toPageResult(info.getTotal(),troubleList);
}
@Override
public List<CoupletTroubleType> selectTroubleListByType() {
return sysTroubleMapper.selectTroubleListByType();
}
@Override
public List<CoupletTroubleGrade> selectTroubleListByGrade() {
return sysTroubleMapper.selectTroubleListByGrade();
}
/**
*
* @param coupletTroubleLog
* @return
*/
@Override
public int insertMsgResq(CoupletTroubleCode coupletTroubleCode) {
return sysTroubleMapper.insertMsgResq(coupletTroubleCode);
public void newFaultData(CoupletTroubleCode code) {
sysTroubleMapper.newFaultData(code);
}
/**
*
*
// * @param troubleAddReq
* @return
*/
@Override
public void cleanTroubleCode() {
sysTroubleMapper.cleanTroubleCode();
}
// @Override
// public int addTrouble(TroubleAddReq troubleAddReq) {
// return sysTroubleMapper.addTrouble(troubleAddReq);
// }
//
// /**
// * 修改故障码数据
// * @param troubleUpdReq
// * @return
// */
// @Override
// public int updateTrouble(TroubleUpdReq troubleUpdReq) {
// return sysTroubleMapper.updateTrouble(troubleUpdReq);
// }
}

View File

@ -2,8 +2,6 @@ package com.couplet.business.server.service.impl;
import com.couplet.business.server.mapper.VehicleDetectionMapper;
import com.couplet.business.server.service.VehicleDetectionService;
import com.couplet.business.server.service.VehicleManageService;
import com.couplet.common.core.domain.Result;
import com.couplet.common.domain.Vehicle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -28,7 +26,7 @@ public class VehicleDetectionServiceImpl implements VehicleDetectionService{
}
@Override
public List<Vehicle> findByVin(Integer vehicleId) {
public List<Vehicle> findByVin(String vehicleId) {
return vehicleDetectionMapper.findByVin(vehicleId);
}
}

View File

@ -61,6 +61,8 @@ public class VehicleManageServiceImpl implements VehicleManageService {
*/
@Override
public Result addVehicle(VehicleMiddle middle) {
Long userId = SecurityUtils.getUserId();
middle.setUserId(userId);
Result<Integer> integerResult = remoteVehicleService.addVehicle(middle);
Integer resultData = integerResult.getData();
return resultData==1?Result.success():Result.error("添加失败");

View File

@ -16,7 +16,6 @@ import com.couplet.common.domain.VehicleType;
import com.couplet.common.domain.request.VehicleEditParams;
import com.couplet.common.domain.request.VehicleInsertParams;
import com.couplet.common.domain.request.VehicleListParams;
import com.couplet.common.security.utils.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -98,7 +97,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
**/
@Override
public String deleteById(Long vehicleId) {
String result;
String result = "";
UpdateWrapper<Vehicle> updateWrapper = new UpdateWrapper<>();
@ -126,7 +125,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
**/
@Override
public String editById(VehicleEditParams editParams) {
String result;
String result = "";
if ((editParams.getLogoIds() == null || editParams.getLogoIds().isEmpty())) {
result = "未选择电子围栏";
@ -181,7 +180,7 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
**/
@Override
public String insert(VehicleInsertParams insertParams) {
String result;
String result = "";
if ((insertParams.getLogoIds() == null || insertParams.getLogoIds().isEmpty())) {
result = "未选择电子围栏";
@ -250,8 +249,9 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
**/
@Override
public List<Long> getBindLogoById(Long vehicleId) {
List<Long> logoIds = vehicleAndLogoService.getBindLogoById(vehicleId);
return vehicleAndLogoService.getBindLogoById(vehicleId);
return logoIds;
}
/*
@ -273,8 +273,8 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
@Override
public Integer addVehicle(VehicleMiddle vehicleMiddle) {
Long userId = SecurityUtils.getUserId();
return vehicleMapper.addVehicle(userId,vehicleMiddle.getVehicleIds());
return vehicleMapper.addVehicle(vehicleMiddle);
}
@Override
@ -282,14 +282,6 @@ public class VehicleServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
return vehicleMapper.vehicleAll();
}
//通过vin修改车辆上下线的状态
@Override
public Integer onOrOutLineByVIN(String vin, Integer status) {
return vehicleMapper.onOrOutLineByVIN(vin, status);
}
@Override
public List<Vehicle> findByVIN(String vin) {

View File

@ -1,4 +1,4 @@
package com.couplet.business.time;
package com.couplet.business.server.time;
import com.couplet.business.server.service.VehicleService;
import com.couplet.common.domain.Vehicle;

View File

@ -19,6 +19,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
public class CoupletMqApplatcaion {
public static void main(String[] args) {
SpringApplication.run(CoupletMqApplatcaion.class, args);
System.out.println("MQ模块启动成功");
System.out.println("获取报文、RabbitMQ模块启动成功");
}
}

View File

@ -25,16 +25,22 @@ import org.springframework.context.annotation.Primary;
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
// 通过注入的方式获取队列名、交换机名和路由键
//队列名
// @Value("${mq.queueName}")
public static final String queueName = "queueName";
@Value("queueName")
public String queueName;
//队列名
public static final String FENCE_QUEUE ="fenceQueue";
//交换机
// @Value("${mq.exchangeName}")
public static final String exchangeName = "exchangeName";
@Value("exchangeName")
public String exchangeName;
public static final String FENCE_EXCHANGE="fenceExchange";
//路由键
// @Value("${mq.routingKey}")
public static final String routingKey = "routingKey";
@Value("routingKey")
public String routingKey;
//路由键
public static final String FENCE_ROUTINGKEY="fenceRoutingKey";
private RabbitTemplate rabbitTemplate;
@ -61,6 +67,10 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem
public Queue queue() {
return new Queue(queueName, true);
}
@Bean("fenceQueue")
public Queue queue2() {
return new Queue(FENCE_QUEUE, true);
}
/*
* @Author: LiuYunHu
@ -74,6 +84,11 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem
return new DirectExchange(exchangeName);
}
@Bean("fenceExchange")
public DirectExchange directExchange2() {
return new DirectExchange(FENCE_EXCHANGE);
}
/*
* @Author: LiuYunHu
* @Date: 2024/3/29 21:27
@ -116,6 +131,11 @@ public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTem
return BindingBuilder.bind(queue()).to(directExchange()).with(routingKey);
}
@Bean("fenceRoutingKey")
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(directExchange2()).with(FENCE_ROUTINGKEY);
}
/*
* @Author: LiuYunHu
* @Date: 2024/3/29 21:28

View File

@ -1,15 +1,20 @@
package com.couplet.mq.controller;
import com.couplet.common.core.utils.uuid.IdUtils;
import com.couplet.common.domain.request.FenceUpdateRequest;
import com.couplet.common.domain.request.RealTimeDataRequest;
import com.couplet.common.security.utils.SecurityUtils;
import com.couplet.mq.config.RabbitMQConfig;
import com.couplet.mq.domain.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ProjectName: five-groups-couplet
@ -23,17 +28,17 @@ import org.springframework.web.bind.annotation.*;
public class MqController {
// 通过注入的方式获取队列名、交换机名和路由键
//队列名
@Value("${mq.queueName}")
public static final String queueName="queueName";
@Value("${mq.finByVinQueueName}")
public static final String finByVinQueueName="finByVinQueueName";
@Value("queueName")
public String queueName;
@Value("finByVinQueueName")
public String finByVinQueueName;
//交换机
@Value("${mq.exchangeName}")
public static final String exchangeName="exchangeName";
@Value("exchangeName")
public String exchangeName;
//路由键
@Value("${mq.routingKey}")
public static final String routingKey="routingKey";
@Value("routingKey")
public String routingKey;
@Autowired
private RabbitTemplate rabbitTemplate;
@ -57,13 +62,19 @@ public class MqController {
}
@PostMapping("findByVin/{vin}")
public void postFindByVin(@PathVariable String vin){
RealTimeDataRequest realTimeDataRequest = new RealTimeDataRequest();
@PostMapping("findByVin")
public void postFindByVin(@RequestBody RealTimeDataRequest request){
Long userId = SecurityUtils.getUserId();
realTimeDataRequest.setUserId(userId);
realTimeDataRequest.setVin(vin);
rabbitTemplate.convertAndSend(exchangeName, routingKey, realTimeDataRequest, message -> {
request.setUserId(userId);
rabbitTemplate.convertAndSend(exchangeName, routingKey, request, message -> {
message.getMessageProperties().setMessageId(IdUtils.randomUUID());
return message;
}, new CorrelationData(IdUtils.randomUUID())
);
}
@PostMapping("fenceQueue")
public void fenceQueue(@RequestBody FenceUpdateRequest fenceUpdateRequest){
rabbitTemplate.convertAndSend(RabbitMQConfig.FENCE_EXCHANGE, RabbitMQConfig.FENCE_ROUTINGKEY, fenceUpdateRequest, message -> {
message.getMessageProperties().setMessageId(IdUtils.randomUUID());
return message;
}, new CorrelationData(IdUtils.randomUUID())

View File

@ -0,0 +1,19 @@
package com.couplet.mq.remote;
import com.couplet.common.core.constant.ServiceNameConstants;
import com.couplet.common.domain.request.FenceUpdateRequest;
import com.couplet.mq.remote.factory.RemoteFenceFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(contextId = "remoteVehicleService" ,
value = ServiceNameConstants.BUSINESS_SERVICE,
fallbackFactory = RemoteFenceFallbackFactory.class
)
public interface RemoteFenceService {
@PostMapping("/mq/fenceQueue")
public void fenceQueue(@RequestBody FenceUpdateRequest fenceUpdateRequest);
}

View File

@ -0,0 +1,33 @@
package com.couplet.mq.remote.factory;
import com.couplet.common.domain.request.FenceUpdateRequest;
import com.couplet.mq.remote.RemoteFenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.openfeign.FallbackFactory;
import org.springframework.stereotype.Component;
import static com.couplet.common.core.domain.Result.error;
/**
* @author fufanrui
* @version 1.0
* @description: TODO
* @date 2024/4/2 14:46
*/
@Component
public class RemoteFenceFallbackFactory implements FallbackFactory<RemoteFenceService> {
private static final Logger log = LoggerFactory.getLogger(RemoteFenceFallbackFactory.class);
@Override
public RemoteFenceService create(Throwable cause) {
return new RemoteFenceService() {
@Override
public void fenceQueue(FenceUpdateRequest fenceUpdateRequest) {
error("调用失败...."+cause.getMessage());
}
};
}
}

View File

@ -0,0 +1,164 @@
package com.couplet.mq.service;
import com.couplet.mq.domain.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @ProjectName: five-groups-couplet
* @Author: LiuYunHu
* @CreateTime: 2024/3/28
* @Description: MQ
*/
@Component
@Slf4j
@SuppressWarnings("all")
@RabbitListener(queues = "queueName")
public class Consumer {
@Autowired
private StringRedisTemplate redis;
/* 线
//创建一个定长线程池
private final Executor executor = Executors.newFixedThreadPool(5);
@Async
@RabbitHandler
public void process(User param, Channel channel, Message message) {
executor.execute(() -> {
try {
handleMessage(param, channel, message);
} catch (IOException e) {
log.error("处理消息失败:{}", e);
}
});
}
//处理信息的方法
private void handleMessage(User param, Channel channel, Message message) throws IOException {
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
if (!redis.hasKey("value:" + messageId)) {
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
}
// 1 添加成功新数据 0已有重复值,不允许再添加
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
//过期时间
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
try {
if (add == 1) {
//第一次 消费
System.out.println("*****************************");
System.out.println("消费者收到消息:" + param);
System.out.println("*****************************");
log.info("消费结束");
channel.basicAck(deliveryTag, false);
} else {
//重复消费
log.error("重复消费");
channel.basicReject(deliveryTag, false);
//删除缓存
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
}
} catch (Exception e) {
log.error("消息没有成功消费!");
String s = redis.opsForValue().get("value:" + messageId);
long oldTag = Long.parseLong(s);
if (deliveryTag == (oldTag + 2)) {
log.error("确实消费不了,不入队了!");
channel.basicNack(deliveryTag, false, false);
} else {
log.info("消息消费失败,重新入队");
channel.basicNack(deliveryTag, false, true);
}
}
}
**/
@RabbitHandler
public void process(User param, Channel channel, Message message) throws IOException {
log.info("消费者收到消息为:{},{}" + param, message.getMessageProperties().getDeliveryTag());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
if (!redis.hasKey("value:" + messageId)) {
redis.opsForValue().set("value:" + messageId, "" + deliveryTag, 5, TimeUnit.MINUTES);
}
// 1 添加成功新数据 0已有重复值,不允许再添加
Long add = redis.opsForSet().add("set:" + messageId, "set:" + messageId);
//过期时间
redis.expire("set:" + messageId, 5, TimeUnit.MINUTES);
try {
if (add == 1) {
//第一次 消费
System.out.println("*****************************");
System.out.println("消费者收到消息:" + param);
System.out.println("*****************************");
log.info("消费结束");
//确认消费
channel.basicAck(deliveryTag, false);
} else {
//重复消费
log.error("重复消费");
//拒绝消费
channel.basicReject(deliveryTag, false);
//删除缓存
redis.opsForSet().remove("set:" + messageId, "set:" + messageId);
}
} catch (Exception e) {
log.error("消息没有成功消费!");
String s = redis.opsForValue().get("value:" + messageId);
long oldTag = Long.parseLong(s);
if (deliveryTag == (oldTag + 2)) {
log.error("确实消费不了,不入队了!");
//拒绝消费
channel.basicNack(deliveryTag, false, false);
} else {
log.info("消息消费失败,重新入队");
//重新入队
channel.basicNack(deliveryTag, false, true);
}
}
}
}