diff --git a/cloud-common/cloud-common-rabbit/pom.xml b/cloud-common/cloud-common-rabbit/pom.xml index fa6d383..53ac6d7 100644 --- a/cloud-common/cloud-common-rabbit/pom.xml +++ b/cloud-common/cloud-common-rabbit/pom.xml @@ -32,4 +32,4 @@ - \ No newline at end of file + diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java new file mode 100644 index 0000000..fe4c6b2 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java @@ -0,0 +1,44 @@ +package com.muyu.common.rabbit.callback; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @author Lenovo + * @Description 消息发送到broker确认回调 + */ + +@Slf4j +@Component +public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { + + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 初始化 + */ + @PostConstruct + public void init() { + this.rabbitTemplate.setConfirmCallback(this); + } + + /** + * 回调确认方法(消息发送之后 消息无论发送成功还是失败都会执行这个回调方法) + * @param correlationData 回调的相关数据 + * @param ack 确认结果(true表示消息已经被broker接收,false表示消息未被broker接收) + * @param cause 失败原因(当ack为false时,表示拒绝接收消息的原因;当ack为true时,该值为空) + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + log.info("消息发送到broker成功!"); + } else { + log.info("消息发送到broker失败,失败原因:{}", cause); + } + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java new file mode 100644 index 0000000..6145c2c --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java @@ -0,0 +1,39 @@ +package com.muyu.common.rabbit.callback; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @author Lenovo + * @Description 消息发送失败时回调 + */ + +@Slf4j +@Component +public class ReturnsCallback implements RabbitTemplate.ReturnsCallback { + + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 初始化 + */ + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送到队列失败时执行 + * @param returnedMessage 返回的消息和元数据 + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.info("消息:{}被交换机:{}回退!回退原因:{}", returnedMessage.getMessage().toString(), + returnedMessage.getExchange(), returnedMessage.getReplyText()); + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ConfirmCallbackConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ConfirmCallbackConfig.java deleted file mode 100644 index 1e25c6f..0000000 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ConfirmCallbackConfig.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.muyu.common.rabbit.config; - -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.rabbit.connection.CorrelationData; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - - -/** - * @author Lenovo - */ -@Component -@Log4j2 -public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { - - @Autowired - private RabbitTemplate rabbitTemplate; - - /** - * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 - */ - @PostConstruct - public void init() { - rabbitTemplate.setConfirmCallback(this); - } - - /** - * 交换机不管是否收到消息的一个回调方法 - * - * @param correlationData 消息相关数据 - * @param ack 交换机是否收到消息 - * @param cause 失败原因 - */ - @Override - public void confirm(CorrelationData correlationData, boolean ack, String cause) { - if (!ack) { - String exchange = correlationData.getReturned().getExchange(); - String message = correlationData.getReturned().getMessage().getBody().toString(); - // 发送异常 - log.error("消息:{},发送到交换机:{}失败,原因是:{}", message, exchange, cause); - } - } - -} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java new file mode 100644 index 0000000..d87c912 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java @@ -0,0 +1,72 @@ +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Lenovo + * @Description RabbitMQ连接和管理功能配置 + */ + +@Configuration +public class RabbitAdminConfig { + + /** + * RabbitMQ服务器的主机地址 + */ + @Value("${spring.rabbitmq.host}") + private String host; + + /** + * RabbitMQ的用户名 + */ + @Value("${spring.rabbitmq.username}") + private String username; + + /** + * RabbitMQ的密码 + */ + @Value("${spring.rabbitmq.password}") + private String password; + + /** + * RabbitMQ的虚拟主机 + */ + @Value("${spring.rabbitmq.virtualhost}") + private String virtualhost; + + /** + * 创建并初始化RabbitAdmin实例 + * + * @return RabbitAdmin 实例 + */ + @Bean + public RabbitAdmin rabbitAdmin() { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } + + /** + * 创建RabbitMQ连接工厂 + * + * @return ConnectionFactory 实例 + */ + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java similarity index 59% rename from cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java rename to cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java index 4e250d0..0c99e3f 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java @@ -1,32 +1,38 @@ -package com.muyu.common.rabbit; +package com.muyu.common.rabbit.config; +import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; -import javax.annotation.Resource; - /** * @author Lenovo + * @Description rabbitMQ的监听器配置 */ + @Configuration -public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer { +public class RabbitListenerConfig implements RabbitListenerConfigurer { static { + // 设置为信任所有类型的反序列化,确保消息能够正确反序列化 System.setProperty("spring.amqp.deserialization.trust.all", "true"); } - //以下配置RabbitMQ消息服务 - @Resource + /** + * RabbitMQ连接工厂,用于创建连接 + */ + @Autowired public ConnectionFactory connectionFactory; /** - * 处理器方法工厂 - * @return + * 创建处理器方法工厂的bean + * + * @return DefaultMessageHandlerMethodFactory 实例,用于处理消息的转换和方法调用 */ @Bean public DefaultMessageHandlerMethodFactory handlerMethodFactory() { @@ -36,8 +42,14 @@ public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit return factory; } + /** + * 配置RabbitMQ监听器的消息处理方法工厂。 + * + * @param rabbitListenerEndpointRegistrar 实例,用于注册监听器的配置 + */ @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { + // 注册自定义的消息处理方法工厂 rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory()); } diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java new file mode 100644 index 0000000..6096282 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java @@ -0,0 +1,26 @@ + +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Lenovo + * @Description rabbitMQ消息转换器配置 + */ + +@Configuration +public class RabbitMQMessageConverterConfig { + + /** + * 消息转换配置 + * + * @return 消息转换器 + */ + @Bean + public MessageConverter jsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java deleted file mode 100644 index ae17741..0000000 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.muyu.common.rabbit.config; - -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.core.ReturnedMessage; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -/** - * 消息发送到队列的确认 - * @author Lenovo - */ -@Component -@Log4j2 -public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback { - - @Autowired - private RabbitTemplate rabbitTemplate; - - /** - * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 - */ - @PostConstruct - public void init() { - rabbitTemplate.setReturnsCallback(this); - } - - /** - * 消息发送失败 则会执行这个方法 - * - * @param returnedMessage the returned message and metadata. - */ - @Override - public void returnedMessage(ReturnedMessage returnedMessage) { - log.error("消息:{},被交换机:{} 回退!退回原因为:{}", - returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText()); - // TODO 回退了所有的信息,可做补偿机制 - } - -} diff --git a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 189ea2c..0b13700 100644 --- a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,3 @@ -com.muyu.common.rabbit.RabbitListenerConfigurer \ No newline at end of file +com.muyu.common.rabbit.config.RabbitListenerConfig +com.muyu.common.rabbit.config.RabbitAdminConfig +com.muyu.common.rabbit.config.RabbitMQMessageConverterConfig diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/config/RabbitmqConfig.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/config/RabbitmqConfig.java index d215a28..75f6e71 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/config/RabbitmqConfig.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/config/RabbitmqConfig.java @@ -1,5 +1,6 @@ package com.muyu.cargateway.config; +import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Exchange; @@ -19,6 +20,7 @@ import org.slf4j.LoggerFactory; * @ Description:rabbitmq配置类 * @author Lenovo */ +@Log4j2 @Configuration public class RabbitmqConfig { // 日志 @@ -52,10 +54,10 @@ public class RabbitmqConfig { public Exchange exchangeTopicsInform() { try { Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); - logger.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM); + log.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM); return exchange; } catch (Exception e) { - logger.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e); + log.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e); throw e; } } @@ -65,10 +67,10 @@ public class RabbitmqConfig { public Queue queueInformEmail() { try { Queue queue = new Queue(QUEUE_INFORM_EMAIL); - logger.info("创建的对列为: {}", QUEUE_INFORM_EMAIL); + log.info("创建的队列为: {}", QUEUE_INFORM_EMAIL); return queue; } catch (Exception e) { - logger.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e); + log.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e); throw e; } } @@ -78,10 +80,10 @@ public class RabbitmqConfig { public Queue queueInformSms() { try { Queue queue = new Queue(QUEUE_INFORM_SMS); - logger.info("创建的对列为: {}", QUEUE_INFORM_SMS); + log.info("创建的队列为: {}", QUEUE_INFORM_SMS); return queue; } catch (Exception e) { - logger.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e); + log.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e); throw e; } } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/Sample.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/Sample.java index 2ae8802..450e980 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/Sample.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/Sample.java @@ -85,7 +85,7 @@ public class Sample implements ApplicationRunner , DisposableBean { // 设置密码 .setPassword("10160810@a") // 设置要创建的实例数量 - .setAmount(1) + .setAmount(2) .setInternetChargeType("PayByTraffic") .setInternetMaxBandwidthOut(1); @@ -109,7 +109,6 @@ public class Sample implements ApplicationRunner , DisposableBean { // 在获取实例详细信息时,确保获取所有必要的信息 DescribeInstancesResponse describeInstancesResponse = getInstances(client); List instances = describeInstancesResponse.getBody().getInstances().getInstance(); - List aliInstances = new ArrayList<>(); instanceIds = new ArrayList<>(); @@ -219,7 +218,7 @@ public class Sample implements ApplicationRunner , DisposableBean { public void destroy(){ try { log.info("删除实例方法"); -// Thread.sleep(100000); + Thread.sleep(10000); deleteSample(); } catch (Exception e) { log.info("删除实例失败"); diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java index 9aa6479..3967b9e 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java @@ -35,8 +35,11 @@ public class CarOneClickOperationServiceImpl implements CarOneClickOperationServ @Override public void getConnect(VehicleConnectionReq vehicleConnectionReq) { log.info("车辆连接请求:{}",vehicleConnectionReq.toString()); + // 发送消息 - rabbitTemplate.convertAndSend("exchange_topics_inform","",vehicleConnectionReq.getVehicleVin()); + rabbitTemplate.convertAndSend("exchange_topics_inform","inform.#.email.#",vehicleConnectionReq.getVehicleVin()); + log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin()); + VehicleConnection vehicleConnection = new VehicleConnection(); vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin()); diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml index d9f3d66..93ed008 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml @@ -4,19 +4,34 @@ server: # nacos线上地址 nacos: - addr: 106.54.193.225:8848 + addr: 47.116.173.119:8848 user-name: nacos password: nacos namespace: one # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: + rabbitmq: + host: 47.116.173.119 + port: 5672 + username: guest + password: guest + virtual-host: / + listener: + simple: + prefetch: 1 # 默认每次取出一条消息消费, 消费完成取下一条 + acknowledge-mode: manual # 设置消费端手动ack确认 + retry: + enabled: true # 是否支持重试 + publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) + publisher-returns: true #确认消息已发送到队列(Queue) amqp: deserialization: - trust: - all: true + trust: + all: true main: - allow-bean-definition-overriding: true + allow-bean-definition-overriding: true + application: # 应用名称 name: cloud-car-gateway