diff --git a/README.md b/README.md index 2144055..06746e1 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ### 简介 -本项目是 **ETL** 项目,主要功能是对接入的 **数据源 **进行数据的 **抽取**(Extract) > **清洗**(Cleaning) > **转换**(Transform) > **装载**(Load) 。 +本项目是 **ETL** 项目,主要功能是对接入的 **数据源** 进行数据的 **抽取**(Extract) > **清洗**(Cleaning) > **转换**(Transform) > **装载**(Load) 。 ### 背景 diff --git a/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-common/pom.xml b/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-common/pom.xml index 373d7d6..48877e3 100644 --- a/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-common/pom.xml +++ b/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-common/pom.xml @@ -24,5 +24,11 @@ com.muyu muyu-common-core + + com.alibaba + druid + 1.2.20 + compile + diff --git a/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/DataSourceService.java b/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/DataSourceService.java index 06e16af..5e8cb15 100644 --- a/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/DataSourceService.java +++ b/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/DataSourceService.java @@ -1,9 +1,7 @@ package com.ruoyi.dataAsset.service; -import java.sql.Connection; import java.sql.SQLException; import java.util.List; - import com.ruoyi.dataAsset.domain.DataSource; import com.baomidou.mybatisplus.extension.service.IService; @@ -30,14 +28,6 @@ public interface DataSourceService extends IService { */ boolean testConnect(Long id) throws SQLException; - /** - * 获取数据源的链接 - * @param dataSource 数据源 - * @return 连接 - * @throws SQLException 异常 - */ - Connection getConnection(DataSource dataSource) throws SQLException; - /** * 同步数据结构 * @param dataSourceId 数据源编号 diff --git a/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/impl/DataSourceServiceImpl.java b/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/impl/DataSourceServiceImpl.java index dd2d2dd..e98efa6 100644 --- a/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/impl/DataSourceServiceImpl.java +++ b/ruoyi-modules/ruoyi-data_asset/ruoyi-data_asset-server/src/main/java/com/ruoyi/dataAsset/service/impl/DataSourceServiceImpl.java @@ -4,6 +4,12 @@ import java.sql.*; import java.util.*; import java.util.Date; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.druid.pool.DruidPooledConnection; +import com.ruoyi.common.core.text.Convert; import com.ruoyi.common.core.utils.ObjUtils; import com.ruoyi.common.core.utils.StringUtils; import com.ruoyi.common.security.utils.SecurityUtils; @@ -81,9 +87,9 @@ public class DataSourceServiceImpl extends ServiceImpl1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5); + }else if(config.contains("maxNum")){ + //最大连接数 + druidDataSource.setMaxActive(split.length>1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):20); + }else if(config.contains("minIdle")){ + //最小空闲数 + druidDataSource.setMinIdle(split.length>1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5); + }else { + //获取连接最大等待时间,单位毫秒 + druidDataSource.setMaxWait(split.length>1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):6000); + } + } + } + String validationQuery = "select 1"; + //申请连接时执行validationQuery检测连接是否有效,防止取到的连接不可用 + druidDataSource.setTestOnBorrow(true); + druidDataSource.setValidationQuery(validationQuery); + druidDataSource.init(); + return druidDataSource; } /** @@ -155,36 +188,38 @@ public class DataSourceServiceImpl extends ServiceImpl{ - this.SynchronousTableStructure(dataSource); - }); + ExecutorService executorService = Executors.newCachedThreadPool(); + //异步同步表结构并等待所有任务结束 + CompletableFuture.supplyAsync(() -> { + this.SynchronousTableStructure(druidDatasource, dataSource); + return "end"; + }, executorService).get(); //修改数据 this.updateById(dataSource); result.close(); - connection.close(); - } catch (SQLException e) { + druidDatasource.close(); + } catch (SQLException | InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } /** * 同步表结构 + * @param druidDatasource 数据源 * @param dataSource 数据源 */ - public void SynchronousTableStructure(DataSource dataSource){ - Connection connection=null; + public void SynchronousTableStructure(DruidDataSource druidDatasource,DataSource dataSource){ try { - connection=getConnection(dataSource); + List> runAsyncList=new ArrayList<>(); + DruidPooledConnection connection = druidDatasource.getConnection(); ResultSet tables = connection.getMetaData().getTables(connection.getCatalog(), "dbo", "%", new String[] { "TABLE" }); while (tables.next()) { String tableName = tables.getString("TABLE_NAME"); @@ -209,10 +244,13 @@ public class DataSourceServiceImpl extends ServiceImpl{ - this.SynchronousColumnInfo(dataSource,tableName,tableInfo.getId()); - }); + runAsyncList.add(CompletableFuture.supplyAsync(() -> { + this.SynchronousColumnInfo(druidDatasource, tableName, tableInfo.getId(), dataSource); + return "end"; + })); } + //等待所有任务结束 + CompletableFuture.allOf(runAsyncList.toArray(CompletableFuture[]::new)).join(); tables.close(); connection.close(); } catch (SQLException e) { @@ -222,18 +260,17 @@ public class DataSourceServiceImpl extends ServiceImpl list=new ArrayList<>(); //声明资产模型数据空集合 List moduleDataList=new ArrayList<>(); - Connection connection=null; try { - connection=getConnection(dataSource); + DruidPooledConnection connection = druidDatasource.getConnection(); ResultSet columns = connection.getMetaData().getColumns(connection.getCatalog(), null, tableName, "%"); ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), null, tableName); String primaryKeyName=null;