feat(DataSourceServiceImpl):新增数据库连接池

dev
gtl 2024-04-29 22:40:42 +08:00
parent d84aa60bf2
commit 5157792eb7
4 changed files with 79 additions and 46 deletions

View File

@ -4,7 +4,7 @@
### 简介
本项目是 **ETL** 项目,主要功能是对接入的 **数据源 **进行数据的 **抽取**(Extract) > **清洗**(Cleaning) > **转换**(Transform) > **装载**(Load) 。
本项目是 **ETL** 项目,主要功能是对接入的 **数据源** 进行数据的 **抽取**(Extract) > **清洗**(Cleaning) > **转换**(Transform) > **装载**(Load) 。
### 背景

View File

@ -24,5 +24,11 @@
<groupId>com.muyu</groupId>
<artifactId>muyu-common-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.20</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,9 +1,7 @@
package com.ruoyi.dataAsset.service;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import com.ruoyi.dataAsset.domain.DataSource;
import com.baomidou.mybatisplus.extension.service.IService;
@ -30,14 +28,6 @@ public interface DataSourceService extends IService<DataSource> {
*/
boolean testConnect(Long id) throws SQLException;
/**
*
* @param dataSource
* @return
* @throws SQLException
*/
Connection getConnection(DataSource dataSource) throws SQLException;
/**
*
* @param dataSourceId

View File

@ -4,6 +4,12 @@ import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.ruoyi.common.core.text.Convert;
import com.ruoyi.common.core.utils.ObjUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.security.utils.SecurityUtils;
@ -81,9 +87,9 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
public boolean testConnect(Long id){
boolean result=true;
DataSource dataSource = this.getById(id);
Connection connection=null;
DruidDataSource druidDatasource=null;
try {
connection = this.getConnection(dataSource);
druidDatasource = this.createDataSource(dataSource);
//如果为不可用修改为可用
if(dataSource.getStatus().equals("N")){
dataSource.setStatus("Y");
@ -91,8 +97,8 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
dataSource.setUpdateBy(SecurityUtils.getUsername());
this.updateById(dataSource);
}
connection.close();
} catch (SQLException e) {
druidDatasource.close();
} catch (Exception e) {
//连接失败,如果为可用修改为不可用
if(dataSource.getStatus().equals("Y")){
dataSource.setStatus("N");
@ -106,22 +112,19 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
/**
*
* Druid
* @param dataSource
* @return
* @return Druid
* @throws SQLException
*/
@Override
public Connection getConnection(DataSource dataSource) throws SQLException {
public DruidDataSource createDataSource(DataSource dataSource) throws SQLException {
String driverType=null;
String url="jdbc:"+dataSource.getDatabaseType()+dataSource.getIp()+":"+dataSource.getPort();
if(dataSource.getDatabaseType().contains("mysql")){
driverType="com.mysql.cj.jdbc.Driver";
url+="/"+dataSource.getDatabases();
}else if(dataSource.getDatabaseType().contains("sqlserver")){
try {
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
driverType="com.microsoft.sqlserver.jdbc.SQLServerDriver";
url+=";DatabaseName="+dataSource.getDatabases()+";";
}
if(StringUtils.isNotEmpty(dataSource.getConnectionParameter())){
@ -131,9 +134,39 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
url+=dataSource.getConnectionParameter()+";";
}
}
//连接超时时间5s
DriverManager.setLoginTimeout(5);
return DriverManager.getConnection(url, dataSource.getUsername(), dataSource.getPassword());
//手动创建数据源
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setName(dataSource.getId().toString());
druidDataSource.setDriverClassName(driverType);
druidDataSource.setUrl(url);
druidDataSource.setUsername(dataSource.getUsername());
druidDataSource.setPassword(dataSource.getPassword());
//设置连接配置
if(StringUtils.isNotEmpty(dataSource.getConnectionConfig())){
String[] configs = dataSource.getConnectionConfig().split(",");
for (String config : configs) {
String[] split = config.split("=");
if(config.contains("initNum")){
//初始化连接池大小
druidDataSource.setInitialSize(split.length>1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5);
}else if(config.contains("maxNum")){
//最大连接数
druidDataSource.setMaxActive(split.length>1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):20);
}else if(config.contains("minIdle")){
//最小空闲数
druidDataSource.setMinIdle(split.length>1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5);
}else {
//获取连接最大等待时间,单位毫秒
druidDataSource.setMaxWait(split.length>1&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):6000);
}
}
}
String validationQuery = "select 1";
//申请连接时执行validationQuery检测连接是否有效防止取到的连接不可用
druidDataSource.setTestOnBorrow(true);
druidDataSource.setValidationQuery(validationQuery);
druidDataSource.init();
return druidDataSource;
}
/**
@ -155,36 +188,38 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
sql="SELECT COUNT(*) tableTotal,SUM(b.rows) recordsTotal FROM sysobjects AS a \n" +
"INNER JOIN sysindexes AS b ON a.id = b.id WHERE ( a.type = 'u' ) AND ( b.indid IN ( 0, 1 ) );";
}
Connection connection=null;
try {
connection = this.getConnection(dataSource);
DruidDataSource druidDatasource = this.createDataSource(dataSource);
//总的记录数,总的表数量
ResultSet result = connection.prepareStatement(sql).executeQuery();
ResultSet result = druidDatasource.getConnection().prepareStatement(sql).executeQuery();
while (result.next()){
dataSource.setTableTotal(result.getInt("tableTotal"));
dataSource.setRecordsTotal(result.getInt("recordsTotal"));
}
//异步同步表结构
CompletableFuture.runAsync(()->{
this.SynchronousTableStructure(dataSource);
});
ExecutorService executorService = Executors.newCachedThreadPool();
//异步同步表结构并等待所有任务结束
CompletableFuture.supplyAsync(() -> {
this.SynchronousTableStructure(druidDatasource, dataSource);
return "end";
}, executorService).get();
//修改数据
this.updateById(dataSource);
result.close();
connection.close();
} catch (SQLException e) {
druidDatasource.close();
} catch (SQLException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
/**
*
* @param druidDatasource
* @param dataSource
*/
public void SynchronousTableStructure(DataSource dataSource){
Connection connection=null;
public void SynchronousTableStructure(DruidDataSource druidDatasource,DataSource dataSource){
try {
connection=getConnection(dataSource);
List<CompletableFuture<String>> runAsyncList=new ArrayList<>();
DruidPooledConnection connection = druidDatasource.getConnection();
ResultSet tables = connection.getMetaData().getTables(connection.getCatalog(), "dbo", "%", new String[] { "TABLE" });
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
@ -209,10 +244,13 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
//添加
tableInfoService.save(tableInfo);
//异步存储表数据
CompletableFuture.runAsync(()->{
this.SynchronousColumnInfo(dataSource,tableName,tableInfo.getId());
});
runAsyncList.add(CompletableFuture.supplyAsync(() -> {
this.SynchronousColumnInfo(druidDatasource, tableName, tableInfo.getId(), dataSource);
return "end";
}));
}
//等待所有任务结束
CompletableFuture.allOf(runAsyncList.toArray(CompletableFuture[]::new)).join();
tables.close();
connection.close();
} catch (SQLException e) {
@ -222,18 +260,17 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
/**
*
* @param dataSource
* @param druidDatasource
* @param tableName
* @param tableId
*/
public void SynchronousColumnInfo(DataSource dataSource,String tableName,Long tableId) {
public void SynchronousColumnInfo(DruidDataSource druidDatasource,String tableName,Long tableId,DataSource dataSource) {
//声明字段信息空集合
List<ColumnInfo> list=new ArrayList<>();
//声明资产模型数据空集合
List<AssetModelData> moduleDataList=new ArrayList<>();
Connection connection=null;
try {
connection=getConnection(dataSource);
DruidPooledConnection connection = druidDatasource.getConnection();
ResultSet columns = connection.getMetaData().getColumns(connection.getCatalog(), null, tableName, "%");
ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), null, tableName);
String primaryKeyName=null;