diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java new file mode 100644 index 0000000..4b18ab2 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ConfirmCallbackConfig.java @@ -0,0 +1,42 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 确认回调配置 + */ +@Component +public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * 当前bean初始化的时候执行 + */ + @PostConstruct + public void init() { + this.rabbitTemplate.setConfirmCallback(this); + } + + /** + * 确认方法 + * @param correlationData correlation data for the callback. + * @param ack true for ack, false for nack + * @param cause An optional cause, for nack, when available, otherwise null. + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + System.out.println("消息发送到 broker 成功"); + } else { + System.out.println("消息发送到 broker 失败,失败的原因:" + cause); + } + } + +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java new file mode 100644 index 0000000..646886b --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitAdminConfig.java @@ -0,0 +1,50 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 构建 RabbitAdmin + */ +@Configuration +public class RabbitAdminConfig { + + @Value("${spring.rabbitmq.host}") + private String host; + @Value("${spring.rabbitmq.username}") + private String username; + @Value("${spring.rabbitmq.password}") + private String password; + @Value("${spring.rabbitmq.virtual-host}") + private String virtualhost; + + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } + + + /** + * rabbitAdmin + * @param connectionFactory + * @return + */ + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java new file mode 100644 index 0000000..4ae5a51 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/RabbitmqConfig.java @@ -0,0 +1,18 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 消息转换配置 + */ +@Configuration +public class RabbitmqConfig { + // 消息转换配置 + @Bean + public MessageConverter jsonMessageConverter(){ + return new Jackson2JsonMessageConverter(); + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java new file mode 100644 index 0000000..32ddbdd --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/config/ReturnsCallbackConfig.java @@ -0,0 +1,39 @@ +package com.muyu.processing.config; + +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送失败返回配置 + */ +@Component +public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * 当前bean初始化的时候执行 + */ + @PostConstruct + public void init() { + this.rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送达到 queue 失败执行 + * + * @param returnedMessage the returned message and metadata. + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + System.out.println("消息" + returnedMessage.getMessage().toString() + + "被交换机" + returnedMessage.getExchange() + "回退!" + + "退回原因为:" + returnedMessage.getReplyText()); + // TODO 回退了所有的信息,可做补偿机制 + } +} diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java new file mode 100644 index 0000000..4dbc156 --- /dev/null +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/MQConsumer.java @@ -0,0 +1,41 @@ +package com.muyu.processing.consumer; + +import com.rabbitmq.client.Channel; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * MQ消费者 + * @Author:杨鹏 + * @Package:com.muyu.processing.consumer + * @Project:car-cloud-server + * @name:MQconsumer + * @Date:2024/9/29 17:19 + */ +@Log4j2 +@Component +public class MQConsumer { + + + + @RabbitListener(queuesToDeclare = @Queue("long_time_no_see")) + public void receive(String haha, Message message, Channel channel){ + + try { + log.info("MQ消费的消息的内容为{}",haha); + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + + } catch (Exception e) { + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + } + +}