解决rabbitmq异步事务回滚,完善业务逻辑
parent
dcc3a6e727
commit
8180f41c22
|
@ -1,13 +1,9 @@
|
||||||
package com.bwie.scenic.rabbitmq;
|
package com.bwie.scenic.rabbitmq;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import com.bwie.common.constants.TokenConstants;
|
|
||||||
import com.bwie.common.domain.Order;
|
import com.bwie.common.domain.Order;
|
||||||
import com.bwie.common.domain.Scenic;
|
import com.bwie.common.domain.Scenic;
|
||||||
import com.bwie.common.domain.User;
|
import com.bwie.common.exception.GlobalExceptionHandler;
|
||||||
import com.bwie.common.utils.JwtUtils;
|
|
||||||
import com.bwie.scenic.mapper.OrderMapper;
|
import com.bwie.scenic.mapper.OrderMapper;
|
||||||
import com.bwie.scenic.service.OrderService;
|
|
||||||
import com.bwie.scenic.service.ScenicService;
|
import com.bwie.scenic.service.ScenicService;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
@ -19,10 +15,7 @@ import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static com.bwie.scenic.service.impl.OrderServiceImpl.REFUND_QUEUE;
|
import static com.bwie.scenic.service.impl.OrderServiceImpl.REFUND_QUEUE;
|
||||||
|
|
||||||
|
@ -41,9 +34,6 @@ public class RabbitMqConfig {
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisTemplate<String, String> redisTemplate;
|
private RedisTemplate<String, String> redisTemplate;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private HttpServletRequest request;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private OrderMapper orderMapper;
|
private OrderMapper orderMapper;
|
||||||
|
|
||||||
|
@ -52,53 +42,39 @@ public class RabbitMqConfig {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 1.6 异步完成退票功能。消息保证不丢失不重复消费
|
* 1.6 异步完成退票功能。消息保证不丢失不重复消费
|
||||||
|
*
|
||||||
* @param orderId
|
* @param orderId
|
||||||
* @param message
|
* @param message
|
||||||
* @param channel
|
* @param channel
|
||||||
*/
|
*/
|
||||||
@Transactional
|
@Transactional
|
||||||
@RabbitListener(queuesToDeclare = { @Queue(name = REFUND_QUEUE)})
|
@RabbitListener(queuesToDeclare = {@Queue(name = REFUND_QUEUE)})
|
||||||
public void refund(Integer orderId, Message message, Channel channel){
|
public void refund(Integer orderId, Message message, Channel channel) throws GlobalExceptionHandler {
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
Long add = redisTemplate.opsForSet().add(messageId, REFUND_QUEUE);
|
Long add = redisTemplate.opsForSet().add(messageId, REFUND_QUEUE);
|
||||||
log.info("消息队列:[{}],接受到信息:[{}],开始消费...",REFUND_QUEUE,orderId);
|
log.info("消息队列:【{}】,接受到信息:【{}】,开始消费...", REFUND_QUEUE, orderId);
|
||||||
try {
|
try {
|
||||||
if (add > 0){
|
if (add.intValue() > 0) {
|
||||||
// 获取用户信息
|
|
||||||
String token = request.getHeader(TokenConstants.TOKEN);
|
|
||||||
String userKey = JwtUtils.getUserKey(token);
|
|
||||||
String json = redisTemplate.opsForValue().get(TokenConstants.LOGIN_TOKEN_KEY + userKey);
|
|
||||||
User user = JSONObject.parseObject(json, User.class);
|
|
||||||
|
|
||||||
// 获取当前订单信息
|
// 获取当前订单信息
|
||||||
Order order = orderMapper.findById(orderId);
|
Order order = orderMapper.findById(orderId);
|
||||||
|
|
||||||
// 1.6 钱款退回原路,扣除10%的手续费,退票成功
|
|
||||||
user.setBalance(user.getBalance().add(order.getTotalFares().multiply(new BigDecimal(0.9))));
|
|
||||||
orderMapper.updateUserBalance(user);
|
|
||||||
redisTemplate.opsForValue().set(TokenConstants.LOGIN_TOKEN_KEY + userKey, JSONObject.toJSONString(user),
|
|
||||||
30, TimeUnit.MINUTES);
|
|
||||||
|
|
||||||
// 获取景区信息
|
// 获取景区信息
|
||||||
Scenic scenic = scenicService.findById(order.getScenicId());
|
Scenic scenic = scenicService.findById(order.getScenicId());
|
||||||
// 景区剩余票数+已退票数
|
// 景区剩余票数+已退票数
|
||||||
scenic.setRemainingVotes(scenic.getRemainingVotes() + order.getVotes());
|
scenic.setRemainingVotes(scenic.getRemainingVotes() + order.getVotes());
|
||||||
scenicService.updateRemainingVotes(scenic);
|
scenicService.updateRemainingVotes(scenic);
|
||||||
|
|
||||||
|
// 订单为已退票
|
||||||
|
order.setStatues(3);
|
||||||
|
orderMapper.updateOrder(order);
|
||||||
|
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
log.info("消息队列:[{}],接受到信息:[{}],消费成功...",REFUND_QUEUE,orderId);
|
log.info("消息队列:【{}】,接受到信息:【{}】,消费成功...", REFUND_QUEUE, orderId);
|
||||||
}else {
|
} else {
|
||||||
log.info("消息队列:[{}],接受到信息:[{}],消费重复...",REFUND_QUEUE,orderId);
|
log.info("消息队列:【{}】,接受到信息:【{}】,消费重复...", REFUND_QUEUE, orderId);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
throw new GlobalExceptionHandler();
|
||||||
log.error("消息队列:[{}],接受到信息:[{}],消费异常,异常信息:[{}]...",REFUND_QUEUE,orderId,e.getMessage());
|
|
||||||
try {
|
|
||||||
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
|
|
||||||
log.info("消息队列:[{}],接受到信息:[{}],消费退还...",REFUND_QUEUE,orderId);
|
|
||||||
} catch (IOException ex) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,13 +97,6 @@ public class OrderServiceImpl implements OrderService {
|
||||||
@Transactional
|
@Transactional
|
||||||
@Override
|
@Override
|
||||||
public void refund(Integer orderId) {
|
public void refund(Integer orderId) {
|
||||||
// 获取当前订单信息
|
|
||||||
Order order = orderMapper.findById(orderId);
|
|
||||||
|
|
||||||
// 订单为已退票
|
|
||||||
order.setStatues(3);
|
|
||||||
orderMapper.updateOrder(order);
|
|
||||||
|
|
||||||
rabbitTemplate.convertAndSend(REFUND_QUEUE, orderId, new MessagePostProcessor() {
|
rabbitTemplate.convertAndSend(REFUND_QUEUE, orderId, new MessagePostProcessor() {
|
||||||
@Override
|
@Override
|
||||||
public Message postProcessMessage(Message message) throws AmqpException {
|
public Message postProcessMessage(Message message) throws AmqpException {
|
||||||
|
@ -111,5 +104,20 @@ public class OrderServiceImpl implements OrderService {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 获取用户信息
|
||||||
|
String token = request.getHeader(TokenConstants.TOKEN);
|
||||||
|
String userKey = JwtUtils.getUserKey(token);
|
||||||
|
String json = redisTemplate.opsForValue().get(TokenConstants.LOGIN_TOKEN_KEY + userKey);
|
||||||
|
User user = JSONObject.parseObject(json, User.class);
|
||||||
|
|
||||||
|
// 获取当前订单信息
|
||||||
|
Order order = orderMapper.findById(orderId);
|
||||||
|
|
||||||
|
// 1.6 钱款退回原路,扣除10%的手续费,退票成功
|
||||||
|
user.setBalance(user.getBalance().add(order.getTotalFares().multiply(new BigDecimal(0.9))));
|
||||||
|
orderMapper.updateUserBalance(user);
|
||||||
|
redisTemplate.opsForValue().set(TokenConstants.LOGIN_TOKEN_KEY + userKey, JSONObject.toJSONString(user),
|
||||||
|
30, TimeUnit.MINUTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue