78 lines
3.4 KiB
Java
78 lines
3.4 KiB
Java
package com.bwie.auth.utils;
|
||
|
||
import org.springframework.amqp.core.Binding;
|
||
import org.springframework.amqp.core.BindingBuilder;
|
||
import org.springframework.amqp.core.DirectExchange;
|
||
import org.springframework.amqp.core.Queue;
|
||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||
import org.springframework.beans.factory.annotation.Autowired;
|
||
import org.springframework.stereotype.Component;
|
||
|
||
import javax.annotation.Resource;
|
||
import java.util.HashMap;
|
||
import java.util.Map;
|
||
|
||
@Component
|
||
public class DLXQueue {
|
||
// routingKey
|
||
private static final String DEAD_ROUTING_KEY = "dead.routingkey";
|
||
private static final String ROUTING_KEY = "routingkey";
|
||
private static final String DEAD_EXCHANGE = "dead.exchange";
|
||
private static final String EXCHANGE = "common.exchange";
|
||
@Autowired
|
||
RabbitTemplate rabbitTemplate;
|
||
@Resource
|
||
RabbitAdmin rabbitAdmin;
|
||
|
||
/**
|
||
* 发送死信队列,过期后进入死信交换机,进入死信队列
|
||
*
|
||
* @param queueName 队列名称
|
||
* @param deadQueueName 死信队列名称
|
||
* @param params 消息内容
|
||
* @param expiration 过期时间 毫秒
|
||
*/
|
||
public void sendDLXQueue(String queueName, String deadQueueName, Object params, Integer expiration) {
|
||
/**
|
||
* ----------------------------------先创建一个ttl队列和死信队列--------------------------------------------
|
||
*/
|
||
Map<String, Object> map = new HashMap<>();
|
||
// 队列设置存活时间,单位ms, 必须是整形数据。
|
||
map.put("x-message-ttl", expiration);
|
||
// 设置死信交换机
|
||
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
|
||
// 设置死信交换器路由
|
||
map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
|
||
/*参数1:队列名称 参数2:持久化 参数3:是否排他 参数4:自动删除队列 参数5:队列参数*/
|
||
Queue queue = new Queue(queueName, true, false, false, map);
|
||
rabbitAdmin.declareQueue(queue);
|
||
/**
|
||
* ---------------------------------创建交换机---------------------------------------------
|
||
*/
|
||
DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
|
||
rabbitAdmin.declareExchange(directExchange);
|
||
/**
|
||
* ---------------------------------队列绑定交换机---------------------------------------------
|
||
*/
|
||
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
|
||
rabbitAdmin.declareBinding(binding);
|
||
/**
|
||
* ---------------------------------在创建一个死信交换机和队列,接收死信队列---------------------------------------------
|
||
*/
|
||
DirectExchange deadExchange = new DirectExchange(DEAD_EXCHANGE, true, false);
|
||
rabbitAdmin.declareExchange(deadExchange);
|
||
|
||
Queue deadQueue = new Queue(deadQueueName, true, false, false);
|
||
rabbitAdmin.declareQueue(deadQueue);
|
||
/**
|
||
* ---------------------------------队列绑定死信交换机---------------------------------------------
|
||
*/
|
||
// 将队列和交换机绑定
|
||
Binding deadbinding = BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
|
||
rabbitAdmin.declareBinding(deadbinding);
|
||
// 发送消息
|
||
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, params);
|
||
}
|
||
}
|