feat:() 新增MQ配置文件 和 MQ消费者
parent
84b54e9116
commit
7033d0e9dd
|
@ -0,0 +1,42 @@
|
||||||
|
package com.muyu.processing.config;
|
||||||
|
|
||||||
|
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 确认回调配置
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当前bean初始化的时候执行
|
||||||
|
*/
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
this.rabbitTemplate.setConfirmCallback(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 确认方法
|
||||||
|
* @param correlationData correlation data for the callback.
|
||||||
|
* @param ack true for ack, false for nack
|
||||||
|
* @param cause An optional cause, for nack, when available, otherwise null.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||||
|
if (ack) {
|
||||||
|
System.out.println("消息发送到 broker 成功");
|
||||||
|
} else {
|
||||||
|
System.out.println("消息发送到 broker 失败,失败的原因:" + cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
package com.muyu.processing.config;
|
||||||
|
|
||||||
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建 RabbitAdmin
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class RabbitAdminConfig {
|
||||||
|
|
||||||
|
@Value("${spring.rabbitmq.host}")
|
||||||
|
private String host;
|
||||||
|
@Value("${spring.rabbitmq.username}")
|
||||||
|
private String username;
|
||||||
|
@Value("${spring.rabbitmq.password}")
|
||||||
|
private String password;
|
||||||
|
@Value("${spring.rabbitmq.virtual-host}")
|
||||||
|
private String virtualhost;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConnectionFactory connectionFactory() {
|
||||||
|
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
|
||||||
|
connectionFactory.setAddresses(host);
|
||||||
|
connectionFactory.setUsername(username);
|
||||||
|
connectionFactory.setPassword(password);
|
||||||
|
connectionFactory.setVirtualHost(virtualhost);
|
||||||
|
// 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
|
||||||
|
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
|
||||||
|
connectionFactory.setPublisherReturns(true);
|
||||||
|
return connectionFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* rabbitAdmin
|
||||||
|
* @param connectionFactory
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
|
||||||
|
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
|
||||||
|
rabbitAdmin.setAutoStartup(true);
|
||||||
|
return rabbitAdmin;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package com.muyu.processing.config;
|
||||||
|
|
||||||
|
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||||
|
import org.springframework.amqp.support.converter.MessageConverter;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息转换配置
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class RabbitmqConfig {
|
||||||
|
// 消息转换配置
|
||||||
|
@Bean
|
||||||
|
public MessageConverter jsonMessageConverter(){
|
||||||
|
return new Jackson2JsonMessageConverter();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package com.muyu.processing.config;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.ReturnedMessage;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息发送失败返回配置
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当前bean初始化的时候执行
|
||||||
|
*/
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
this.rabbitTemplate.setReturnsCallback(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息发送达到 queue 失败执行
|
||||||
|
*
|
||||||
|
* @param returnedMessage the returned message and metadata.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void returnedMessage(ReturnedMessage returnedMessage) {
|
||||||
|
System.out.println("消息" + returnedMessage.getMessage().toString() +
|
||||||
|
"被交换机" + returnedMessage.getExchange() + "回退!"
|
||||||
|
+ "退回原因为:" + returnedMessage.getReplyText());
|
||||||
|
// TODO 回退了所有的信息,可做补偿机制
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package com.muyu.processing.consumer;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQ消费者
|
||||||
|
* @Author:杨鹏
|
||||||
|
* @Package:com.muyu.processing.consumer
|
||||||
|
* @Project:car-cloud-server
|
||||||
|
* @name:MQconsumer
|
||||||
|
* @Date:2024/9/29 17:19
|
||||||
|
*/
|
||||||
|
@Log4j2
|
||||||
|
@Component
|
||||||
|
public class MQConsumer {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@RabbitListener(queuesToDeclare = @Queue("long_time_no_see"))
|
||||||
|
public void receive(String haha, Message message, Channel channel){
|
||||||
|
|
||||||
|
try {
|
||||||
|
log.info("MQ消费的消息的内容为{}",haha);
|
||||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
try {
|
||||||
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue