From c5de9a5bed670c294630b9a8bfc28c23bd1624fa Mon Sep 17 00:00:00 2001 From: ywt <3471515127@qq.com> Date: Mon, 7 Oct 2024 10:29:50 +0800 Subject: [PATCH] =?UTF-8?q?feat():=20=E4=BF=AE=E6=94=B9rabbitmq=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rabbit/callback/ConfirmCallback.java | 44 ++++++++++++ .../rabbit/callback/ReturnsCallback.java | 39 ++++++++++ .../rabbit/config/RabbitAdminConfig.java | 72 +++++++++++++++++++ .../rabbit/config/RabbitListenerConfig.java | 57 +++++++++++++++ .../RabbitMQMessageConverterConfig.java | 26 +++++++ .../aliyun/domain/VehicleConnection.java | 6 +- .../domain/req/VehicleConnectionReq.java | 2 +- .../gateway/aliyun/ecs/DelInstance.java | 4 +- .../vehicle/gateway/aliyun/ecs/Sample.java | 2 +- .../mapper/VehicleConnectionMapper.java | 3 +- .../impl/VehicleConnectionServiceImpl.java | 16 ++++- .../mapper/VehicleConnectionMapper.xml | 4 +- 12 files changed, 266 insertions(+), 9 deletions(-) create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java create mode 100644 cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java new file mode 100644 index 0000000..fe4c6b2 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ConfirmCallback.java @@ -0,0 +1,44 @@ +package com.muyu.common.rabbit.callback; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @author Lenovo + * @Description 消息发送到broker确认回调 + */ + +@Slf4j +@Component +public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { + + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 初始化 + */ + @PostConstruct + public void init() { + this.rabbitTemplate.setConfirmCallback(this); + } + + /** + * 回调确认方法(消息发送之后 消息无论发送成功还是失败都会执行这个回调方法) + * @param correlationData 回调的相关数据 + * @param ack 确认结果(true表示消息已经被broker接收,false表示消息未被broker接收) + * @param cause 失败原因(当ack为false时,表示拒绝接收消息的原因;当ack为true时,该值为空) + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + log.info("消息发送到broker成功!"); + } else { + log.info("消息发送到broker失败,失败原因:{}", cause); + } + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java new file mode 100644 index 0000000..6145c2c --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/callback/ReturnsCallback.java @@ -0,0 +1,39 @@ +package com.muyu.common.rabbit.callback; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @author Lenovo + * @Description 消息发送失败时回调 + */ + +@Slf4j +@Component +public class ReturnsCallback implements RabbitTemplate.ReturnsCallback { + + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 初始化 + */ + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送到队列失败时执行 + * @param returnedMessage 返回的消息和元数据 + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.info("消息:{}被交换机:{}回退!回退原因:{}", returnedMessage.getMessage().toString(), + returnedMessage.getExchange(), returnedMessage.getReplyText()); + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java new file mode 100644 index 0000000..d87c912 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitAdminConfig.java @@ -0,0 +1,72 @@ +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Lenovo + * @Description RabbitMQ连接和管理功能配置 + */ + +@Configuration +public class RabbitAdminConfig { + + /** + * RabbitMQ服务器的主机地址 + */ + @Value("${spring.rabbitmq.host}") + private String host; + + /** + * RabbitMQ的用户名 + */ + @Value("${spring.rabbitmq.username}") + private String username; + + /** + * RabbitMQ的密码 + */ + @Value("${spring.rabbitmq.password}") + private String password; + + /** + * RabbitMQ的虚拟主机 + */ + @Value("${spring.rabbitmq.virtualhost}") + private String virtualhost; + + /** + * 创建并初始化RabbitAdmin实例 + * + * @return RabbitAdmin 实例 + */ + @Bean + public RabbitAdmin rabbitAdmin() { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } + + /** + * 创建RabbitMQ连接工厂 + * + * @return ConnectionFactory 实例 + */ + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java new file mode 100644 index 0000000..0c99e3f --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfig.java @@ -0,0 +1,57 @@ +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; + +/** + * @author Lenovo + * @Description rabbitMQ的监听器配置 + */ + +@Configuration +public class RabbitListenerConfig implements RabbitListenerConfigurer { + + static { + // 设置为信任所有类型的反序列化,确保消息能够正确反序列化 + System.setProperty("spring.amqp.deserialization.trust.all", "true"); + } + + /** + * RabbitMQ连接工厂,用于创建连接 + */ + @Autowired + public ConnectionFactory connectionFactory; + + + /** + * 创建处理器方法工厂的bean + * + * @return DefaultMessageHandlerMethodFactory 实例,用于处理消息的转换和方法调用 + */ + @Bean + public DefaultMessageHandlerMethodFactory handlerMethodFactory() { + DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); + // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body + factory.setMessageConverter(new MappingJackson2MessageConverter()); + return factory; + } + + /** + * 配置RabbitMQ监听器的消息处理方法工厂。 + * + * @param rabbitListenerEndpointRegistrar 实例,用于注册监听器的配置 + */ + @Override + public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { + // 注册自定义的消息处理方法工厂 + rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory()); + } + +} + diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java new file mode 100644 index 0000000..6096282 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitMQMessageConverterConfig.java @@ -0,0 +1,26 @@ + +package com.muyu.common.rabbit.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Lenovo + * @Description rabbitMQ消息转换器配置 + */ + +@Configuration +public class RabbitMQMessageConverterConfig { + + /** + * 消息转换配置 + * + * @return 消息转换器 + */ + @Bean + public MessageConverter jsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } +} diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/VehicleConnection.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/VehicleConnection.java index 0857767..ca6d48a 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/VehicleConnection.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/VehicleConnection.java @@ -22,9 +22,13 @@ public class VehicleConnection { /** * 用户名 */ - private String userName; + private String username; /** * 随机数 */ private String nonce; + /** + * 密码 + */ + private String password; } diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/req/VehicleConnectionReq.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/req/VehicleConnectionReq.java index e114d79..cdcaf65 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/req/VehicleConnectionReq.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/domain/req/VehicleConnectionReq.java @@ -28,7 +28,7 @@ public class VehicleConnectionReq { /** * 用户名 */ - private String userName; + private String username; /** * 随机数 diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/DelInstance.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/DelInstance.java index c9268c5..1b40738 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/DelInstance.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/DelInstance.java @@ -126,8 +126,8 @@ public class DelInstance implements ApplicationListener { try{ log.info("=======>删除实例"); delInstance(); - redisTemplate.delete("instanceIds"); - redisTemplate.delete("instanceList"); +// redisTemplate.delete("instanceIds"); +// redisTemplate.delete("instanceList"); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java index cbcf7b0..50c547e 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java @@ -61,7 +61,7 @@ public class Sample implements ApplicationRunner { // 创建创建实例请求对象并设置参数 RunInstancesRequest runInstancesRequest = new RunInstancesRequest() .setRegionId("cn-shanghai") // 设置地域ID - .setImageId("m-uf63dnbv4od71jlezdne")// 设置镜像ID + .setImageId("m-uf66taa8r57ky0pg3e7s")// 设置镜像ID .setInstanceType("ecs.e-c1m1.large")// 设置实例类型 .setSecurityGroupId("sg-uf6hyictocodexptlgiv")// 设置安全组ID .setVSwitchId("vsw-uf6ags5luz17qd6ckn2tb")// 设置虚拟交换机ID diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/mapper/VehicleConnectionMapper.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/mapper/VehicleConnectionMapper.java index 916480b..b573aa1 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/mapper/VehicleConnectionMapper.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/mapper/VehicleConnectionMapper.java @@ -1,9 +1,10 @@ package com.muyu.cloud.vehicle.gateway.aliyun.mapper; +import com.muyu.cloud.vehicle.gateway.aliyun.domain.VehicleConnection; import com.muyu.cloud.vehicle.gateway.aliyun.domain.req.VehicleConnectionReq; import org.apache.ibatis.annotations.Mapper; @Mapper public interface VehicleConnectionMapper { - void addConnect(VehicleConnectionReq vehicleConnectionReq); + void addConnect(VehicleConnection vehicleConnection); } diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/impl/VehicleConnectionServiceImpl.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/impl/VehicleConnectionServiceImpl.java index fac49dd..71cca36 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/impl/VehicleConnectionServiceImpl.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/impl/VehicleConnectionServiceImpl.java @@ -1,9 +1,11 @@ package com.muyu.cloud.vehicle.gateway.aliyun.service.impl; +import com.muyu.cloud.vehicle.gateway.aliyun.domain.VehicleConnection; import com.muyu.cloud.vehicle.gateway.aliyun.domain.req.VehicleConnectionReq; import com.muyu.cloud.vehicle.gateway.aliyun.mapper.VehicleConnectionMapper; import com.muyu.cloud.vehicle.gateway.aliyun.service.VehicleConnectionService; import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -11,11 +13,23 @@ import org.springframework.stereotype.Service; @Service public class VehicleConnectionServiceImpl implements VehicleConnectionService { + @Autowired + private RabbitTemplate rabbitTemplate; + @Autowired private VehicleConnectionMapper vehicleConnectionMapper; + @Override public void getConnect(VehicleConnectionReq vehicleConnectionReq) { log.info("车辆连接请求:{}",vehicleConnectionReq.toString()); - vehicleConnectionMapper.addConnect(vehicleConnectionReq); + + //发送消息 + rabbitTemplate.convertAndSend("exchange_topics_inform","",vehicleConnectionReq.getVehicleVin()); + + VehicleConnection vehicleConnection = new VehicleConnection(); + vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin()); + vehicleConnection.setUsername(vehicleConnectionReq.getUsername()); + vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin()+vehicleConnectionReq.getTimestamp()+vehicleConnectionReq.getNonce()); + vehicleConnectionMapper.addConnect(vehicleConnection); } } diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/mapper/VehicleConnectionMapper.xml b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/mapper/VehicleConnectionMapper.xml index 5a8505d..8ca5630 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/mapper/VehicleConnectionMapper.xml +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/resources/mapper/VehicleConnectionMapper.xml @@ -7,8 +7,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" insert into car_one_click_operation - (vehicle_vin,user_name,nonce,timestamp) + (vehicle_vin,user_name,password) values - (#{vehicleVin},#{userName},#{nonce},#{timestamp}) + (#{vehicleVin},#{username},#{password})