添加rabbitmq的配置类,使用交换机的广播模式发送消息

dev.car.gateway
冷调 2024-10-07 09:43:16 +08:00
parent f38e42f72f
commit f00c530de1
13 changed files with 238 additions and 113 deletions

View File

@ -32,4 +32,4 @@
</dependency>
</dependencies>
</project>
</project>

View File

@ -0,0 +1,44 @@
package com.muyu.common.rabbit.callback;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @author Lenovo
* @Description broker
*/
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Resource
private RabbitTemplate rabbitTemplate;
/**
*
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
*
* @param correlationData
* @param ack truebrokerfalsebroker
* @param cause ackfalseacktrue
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到broker成功");
} else {
log.info("消息发送到broker失败失败原因{}", cause);
}
}
}

View File

@ -0,0 +1,39 @@
package com.muyu.common.rabbit.callback;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @author Lenovo
* @Description
*/
@Slf4j
@Component
public class ReturnsCallback implements RabbitTemplate.ReturnsCallback {
@Resource
private RabbitTemplate rabbitTemplate;
/**
*
*/
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息:{}被交换机:{}回退!回退原因:{}", returnedMessage.getMessage().toString(),
returnedMessage.getExchange(), returnedMessage.getReplyText());
}
}

View File

@ -1,47 +0,0 @@
package com.muyu.common.rabbit.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author Lenovo
*/
@Component
@Log4j2
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
*
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
String exchange = correlationData.getReturned().getExchange();
String message = correlationData.getReturned().getMessage().getBody().toString();
// 发送异常
log.error("消息:{},发送到交换机:{}失败,原因是:{}", message, exchange, cause);
}
}
}

View File

@ -0,0 +1,72 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Lenovo
* @Description RabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.host}")
private String host;
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.username}")
private String username;
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.password}")
private String password;
/**
* RabbitMQ
*/
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitAdmin
*
* @return RabbitAdmin
*/
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* RabbitMQ
*
* @return ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
}

View File

@ -1,32 +1,38 @@
package com.muyu.common.rabbit;
package com.muyu.common.rabbit.config;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import javax.annotation.Resource;
/**
* @author Lenovo
* @Description rabbitMQ
*/
@Configuration
public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer {
public class RabbitListenerConfig implements RabbitListenerConfigurer {
static {
// 设置为信任所有类型的反序列化,确保消息能够正确反序列化
System.setProperty("spring.amqp.deserialization.trust.all", "true");
}
//以下配置RabbitMQ消息服务
@Resource
/**
* RabbitMQ
*/
@Autowired
public ConnectionFactory connectionFactory;
/**
*
* @return
* bean
*
* @return DefaultMessageHandlerMethodFactory
*/
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
@ -36,8 +42,14 @@ public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit
return factory;
}
/**
* RabbitMQ
*
* @param rabbitListenerEndpointRegistrar
*/
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
// 注册自定义的消息处理方法工厂
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
}

View File

@ -0,0 +1,26 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Lenovo
* @Description rabbitMQ
*/
@Configuration
public class RabbitMQMessageConverterConfig {
/**
*
*
* @return
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@ -1,42 +0,0 @@
package com.muyu.common.rabbit.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
* @author Lenovo
*/
@Component
@Log4j2
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
*/
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息:{},被交换机:{} 回退!退回原因为:{}",
returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText());
// TODO 回退了所有的信息,可做补偿机制
}
}

View File

@ -1 +1,3 @@
com.muyu.common.rabbit.RabbitListenerConfigurer
com.muyu.common.rabbit.config.RabbitListenerConfig
com.muyu.common.rabbit.config.RabbitAdminConfig
com.muyu.common.rabbit.config.RabbitMQMessageConverterConfig

View File

@ -1,5 +1,6 @@
package com.muyu.cargateway.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
@ -19,6 +20,7 @@ import org.slf4j.LoggerFactory;
* @ Descriptionrabbitmq
* @author Lenovo
*/
@Log4j2
@Configuration
public class RabbitmqConfig {
// 日志
@ -52,10 +54,10 @@ public class RabbitmqConfig {
public Exchange exchangeTopicsInform() {
try {
Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
logger.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM);
log.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM);
return exchange;
} catch (Exception e) {
logger.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e);
log.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e);
throw e;
}
}
@ -65,10 +67,10 @@ public class RabbitmqConfig {
public Queue queueInformEmail() {
try {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
logger.info("创建的列为: {}", QUEUE_INFORM_EMAIL);
log.info("创建的列为: {}", QUEUE_INFORM_EMAIL);
return queue;
} catch (Exception e) {
logger.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e);
log.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e);
throw e;
}
}
@ -78,10 +80,10 @@ public class RabbitmqConfig {
public Queue queueInformSms() {
try {
Queue queue = new Queue(QUEUE_INFORM_SMS);
logger.info("创建的列为: {}", QUEUE_INFORM_SMS);
log.info("创建的列为: {}", QUEUE_INFORM_SMS);
return queue;
} catch (Exception e) {
logger.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e);
log.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e);
throw e;
}
}

View File

@ -85,7 +85,7 @@ public class Sample implements ApplicationRunner , DisposableBean {
// 设置密码
.setPassword("10160810@a")
// 设置要创建的实例数量
.setAmount(1)
.setAmount(2)
.setInternetChargeType("PayByTraffic")
.setInternetMaxBandwidthOut(1);
@ -109,7 +109,6 @@ public class Sample implements ApplicationRunner , DisposableBean {
// 在获取实例详细信息时,确保获取所有必要的信息
DescribeInstancesResponse describeInstancesResponse = getInstances(client);
List<DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance> instances = describeInstancesResponse.getBody().getInstances().getInstance();
List<AliInstance> aliInstances = new ArrayList<>();
instanceIds = new ArrayList<>();
@ -219,7 +218,7 @@ public class Sample implements ApplicationRunner , DisposableBean {
public void destroy(){
try {
log.info("删除实例方法");
// Thread.sleep(100000);
Thread.sleep(10000);
deleteSample();
} catch (Exception e) {
log.info("删除实例失败");

View File

@ -35,8 +35,11 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ
@Override
public void getConnect(VehicleConnectionReq vehicleConnectionReq) {
log.info("车辆连接请求:{}",vehicleConnectionReq.toString());
// 发送消息
rabbitTemplate.convertAndSend("exchange_topics_inform","",vehicleConnectionReq.getVehicleVin());
rabbitTemplate.convertAndSend("exchange_topics_inform","inform.#.email.#",vehicleConnectionReq.getVehicleVin());
log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin());
VehicleConnection vehicleConnection = new VehicleConnection();
vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin());

View File

@ -4,19 +4,34 @@ server:
# nacos线上地址
nacos:
addr: 106.54.193.225:8848
addr: 47.116.173.119:8848
user-name: nacos
password: nacos
namespace: one
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:
rabbitmq:
host: 47.116.173.119
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
prefetch: 1 # 默认每次取出一条消息消费, 消费完成取下一条
acknowledge-mode: manual # 设置消费端手动ack确认
retry:
enabled: true # 是否支持重试
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
amqp:
deserialization:
trust:
all: true
trust:
all: true
main:
allow-bean-definition-overriding: true
allow-bean-definition-overriding: true
application:
# 应用名称
name: cloud-car-gateway