From 61e745a0093a26ba714a7aed1224a6521e0d66ef Mon Sep 17 00:00:00 2001 From: Yunfei Du <278774021@qq.com> Date: Fri, 17 May 2024 21:26:24 +0800 Subject: [PATCH] =?UTF-8?q?feat()=E8=A7=84=E5=88=99=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96(RabbitMq)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etl-common/etl-common-core/pom.xml | 6 +- .../client/config/DataAccessClientRunner.java | 2 + .../etl-data-source-server/pom.xml | 6 ++ .../data/config/ConfirmCallbackConfig.java | 51 +++++++++++++++ .../etl/data/config/RabbitAdminConfig.java | 62 +++++++++++++++++++ .../com/etl/data/config/RabbitmqConfig.java | 15 +++++ .../etl/data/config/ReturnCallbackConfig.java | 41 ++++++++++++ .../com/etl/data/service/ProducerService.java | 12 ++++ .../service/impl/ProducerServiceImpl.java | 38 ++++++++++++ 9 files changed, 232 insertions(+), 1 deletion(-) create mode 100644 etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ConfirmCallbackConfig.java create mode 100644 etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitAdminConfig.java create mode 100644 etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitmqConfig.java create mode 100644 etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ReturnCallbackConfig.java create mode 100644 etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/ProducerService.java create mode 100644 etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/impl/ProducerServiceImpl.java diff --git a/etl-common/etl-common-core/pom.xml b/etl-common/etl-common-core/pom.xml index d683dc7..7cbbab5 100644 --- a/etl-common/etl-common-core/pom.xml +++ b/etl-common/etl-common-core/pom.xml @@ -16,7 +16,11 @@ - + + + org.springframework.boot + spring-boot-starter-amqp + org.springframework.cloud diff --git a/etl-modules/etl-data-source/etl-data-source-client/src/main/java/com/etl/data/client/config/DataAccessClientRunner.java b/etl-modules/etl-data-source/etl-data-source-client/src/main/java/com/etl/data/client/config/DataAccessClientRunner.java index 9733f56..936b323 100644 --- a/etl-modules/etl-data-source/etl-data-source-client/src/main/java/com/etl/data/client/config/DataAccessClientRunner.java +++ b/etl-modules/etl-data-source/etl-data-source-client/src/main/java/com/etl/data/client/config/DataAccessClientRunner.java @@ -20,6 +20,8 @@ import java.util.List; @Log4j2 public class DataAccessClientRunner implements ApplicationRunner { + + @Autowired private RemoteDataSourceService remoteDataSourceService; diff --git a/etl-modules/etl-data-source/etl-data-source-server/pom.xml b/etl-modules/etl-data-source/etl-data-source-server/pom.xml index 0e1e747..e084228 100644 --- a/etl-modules/etl-data-source/etl-data-source-server/pom.xml +++ b/etl-modules/etl-data-source/etl-data-source-server/pom.xml @@ -98,6 +98,12 @@ postgresql 42.5.0 + + com.etl + etl-data-source-client + 3.6.3 + compile + diff --git a/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ConfirmCallbackConfig.java b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ConfirmCallbackConfig.java new file mode 100644 index 0000000..5879020 --- /dev/null +++ b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ConfirmCallbackConfig.java @@ -0,0 +1,51 @@ +package com.etl.data.config; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送确认配置 + * 消息发送到交换机的回调 + */ +@Component +@Log4j2 +public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setConfirmCallback(this); + } + + /** + * 交换机不管是否收到消息的一个回调方法 + * + * @param correlationData 消息相关数据 + * @param ack 交换机是否收到消息 + * @param cause 失败原因 + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + // 消息投递到broker 的状态,true表示成功 + log.info("消息已经成功发送到交换机"); + } else { +// String exchange = correlationData.getReturned().getExchange(); +// String message = correlationData.getReturned().getMessage().getBody().toString(); + // 发送异常 + log.error("消息发送到交换机:失败,原因是:{}", cause); + // TODO 可以把异常信息 以及 消息的内容直接添加到 MYSQL + } + } + +} diff --git a/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitAdminConfig.java b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitAdminConfig.java new file mode 100644 index 0000000..6f7080f --- /dev/null +++ b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitAdminConfig.java @@ -0,0 +1,62 @@ +package com.etl.data.config; + +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * rabbitAdmin 配置类 创建RabbitAdmin 放到Spring容器中 + */ +@Configuration +public class RabbitAdminConfig { + + public static final String DATA_SOURCE_EXCHANGE = "data_source_exchange"; + public static final String RULE_ENGINE_EXCHANGE = "rule_engine_exchange"; + + @Value("${spring.rabbitmq.host}") + private String host; + @Value("${spring.rabbitmq.username}") + private String username; + @Value("${spring.rabbitmq.password}") + private String password; + @Value("${spring.rabbitmq.virtualhost}") + private String virtualhost; + + /** + * 构建连接工厂 + * @return + */ + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(host); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualhost); + // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } + + /** + * 构建 RabbitAdmin 管理 RabbitMQ 操作 队列以及交换机 + * @param connectionFactory + * @return + */ + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + //创建交换机 + FanoutExchange dataFanoutExchange = new FanoutExchange(DATA_SOURCE_EXCHANGE, true, true); + //声明交换机 + rabbitAdmin.declareExchange(dataFanoutExchange); + return rabbitAdmin; + } + +} diff --git a/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitmqConfig.java b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitmqConfig.java new file mode 100644 index 0000000..1c00511 --- /dev/null +++ b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/RabbitmqConfig.java @@ -0,0 +1,15 @@ +package com.etl.data.config; + +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitmqConfig { + // 消息转换配置 + @Bean + public MessageConverter jsonMessageConverter(){ + return new Jackson2JsonMessageConverter(); + } +} diff --git a/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ReturnCallbackConfig.java b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ReturnCallbackConfig.java new file mode 100644 index 0000000..392a6f1 --- /dev/null +++ b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/config/ReturnCallbackConfig.java @@ -0,0 +1,41 @@ +package com.etl.data.config; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送到队列的确认 + */ +@Component +@Log4j2 +public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送失败 则会执行这个方法 + * + * @param returnedMessage the returned message and metadata. + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.error("消息:{},被交换机:{} 回退!退回原因为:{}", + returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText()); + // TODO 回退了所有的信息,可做补偿机制 + } + +} diff --git a/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/ProducerService.java b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/ProducerService.java new file mode 100644 index 0000000..062c414 --- /dev/null +++ b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/ProducerService.java @@ -0,0 +1,12 @@ +package com.etl.data.service; + +import com.etl.data.domain.DataSource; + +/** + * 数据源生产者 + * @author YunFei.Du + * @date 9:30 2024/5/17 + */ +public interface ProducerService { + void dataSourceProduct(DataSource dataSource); +} diff --git a/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/impl/ProducerServiceImpl.java b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/impl/ProducerServiceImpl.java new file mode 100644 index 0000000..bdc6d5c --- /dev/null +++ b/etl-modules/etl-data-source/etl-data-source-server/src/main/java/com/etl/data/service/impl/ProducerServiceImpl.java @@ -0,0 +1,38 @@ +package com.etl.data.service.impl; + +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.fastjson.JSON; +import com.etl.data.client.connPool.service.ConnPoolManagementService; +import com.etl.data.config.RabbitAdminConfig; +import com.etl.data.domain.DataSource; +import com.etl.data.domain.dataSource.DataSourceConfig; +import com.etl.data.service.ProducerService; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.bind.annotation.PathVariable; + +import java.sql.SQLException; + +/** + * @ClassName ProducerServiceImpl + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/5/17 9:31 + */ +@Service +public class ProducerServiceImpl implements ProducerService { + @Autowired + private RabbitTemplate rabbitTemplate; + + @Override + public void dataSourceProduct(DataSource dataSource) { + + String msg = JSON.toJSONString ( dataSource ); + rabbitTemplate.convertAndSend ( RabbitAdminConfig.DATA_SOURCE_EXCHANGE, "data_source_key", msg ); + + + //初始化连接池 + ConnPoolManagementService.createPool ( dataSource ); + } +}