feat(DataExtractController):初始化数据抽取控制器

dev
gtl 2024-05-10 19:38:59 +08:00
parent 22bc57b231
commit 7793e99331
12 changed files with 211 additions and 209 deletions

View File

@ -1,13 +1,11 @@
package com.ruoyi.dataAsset.config;
import com.alibaba.druid.pool.DruidDataSource;
import com.ruoyi.common.core.domain.Result;
import com.ruoyi.common.core.text.Convert;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.core.web.page.TableDataInfo;
import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.domain.req.DataSourceQueryReq;
import com.ruoyi.dataAsset.remote.RemoteDataAssetService;
import com.ruoyi.dataAsset.util.DataSourceUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
@ -44,76 +42,11 @@ public class DataAssetApplicationRunner implements ApplicationRunner {
continue;
}
try {
druidDataSourceFactory.put(dataSource.getId(),this.createDataSource(dataSource));
druidDataSourceFactory.put(dataSource.getId(), DataSourceUtil.createDataSource(dataSource));
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
};
/**
* Druid
* @param dataSource
* @return Druid
*/
public DruidDataSource createDataSource(DataSource dataSource) {
DruidDataSource druidDataSource = null;
String driverType=null;
String url="jdbc:"+dataSource.getDatabaseType()+dataSource.getIp()+":"+dataSource.getPort();
if(dataSource.getDatabaseType().contains("mysql")){
driverType="com.mysql.cj.jdbc.Driver";
url+="/"+dataSource.getDatabases();
}else if(dataSource.getDatabaseType().contains("sqlserver")){
driverType="com.microsoft.sqlserver.jdbc.SQLServerDriver";
url+=";DatabaseName="+dataSource.getDatabases()+";";
}
if(StringUtils.isNotEmpty(dataSource.getConnectionParameter())){
if(dataSource.getDatabaseType().contains("mysql")){
url+="?"+dataSource.getConnectionParameter();
}else {
url+=dataSource.getConnectionParameter()+";";
}
}
try {
//手动创建数据源
druidDataSource = new DruidDataSource();
druidDataSource.setName(dataSource.getId().toString());
druidDataSource.setDriverClassName(driverType);
druidDataSource.setUrl(url);
druidDataSource.setUsername(dataSource.getUsername());
druidDataSource.setPassword(dataSource.getPassword());
//设置连接配置
if(StringUtils.isNotEmpty(dataSource.getConnectionConfig())){
String[] configs = dataSource.getConnectionConfig().split(",");
for (String config : configs) {
String[] split = config.split("=");
if(config.contains("initNum")){
//初始化连接池大小
druidDataSource.setInitialSize(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])? Convert.toInt(split[1]):5);
}else if(config.contains("maxNum")){
//最大连接数
druidDataSource.setMaxActive(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):20);
}else if(config.contains("minIdle")){
//最小空闲数
druidDataSource.setMinIdle(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5);
}else {
//获取连接最大等待时间,单位毫秒
druidDataSource.setMaxWait(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):4000);
}
}
}
String validationQuery = "select 1";
//申请连接时执行validationQuery检测连接是否有效防止取到的连接不可用
druidDataSource.setTestOnBorrow(true);
druidDataSource.setValidationQuery(validationQuery);
//防止DruidDataSource一直尝试获取连接,导致服务卡死
druidDataSource.setConnectionErrorRetryAttempts(0);
druidDataSource.setBreakAfterAcquireFailure(true);
druidDataSource.setConnectTimeout(2000);
druidDataSource.init();
} catch (Exception e) {
throw new RuntimeException(e);
}
return druidDataSource;
}
}

View File

@ -0,0 +1,122 @@
package com.ruoyi.dataAsset.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.ruoyi.common.core.text.Convert;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.dataAsset.domain.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
*
* @ClassName DataSourceUtil
* @Author
* @Date 2024/5/10 14:37
*/
public class DataSourceUtil {
/**
*
* @param databaseType
* @return
*/
public static String confirmDriverType(String databaseType){
String driverType=null;
if(databaseType.contains("mysql")){
driverType="com.mysql.cj.jdbc.Driver";
}else if(databaseType.contains("sqlserver")){
driverType="com.microsoft.sqlserver.jdbc.SQLServerDriver";
}
return driverType;
}
/**
*
* @param dataSource
* @return
*/
public static String confirmUrl(DataSource dataSource){
String url="jdbc:"+dataSource.getDatabaseType()+dataSource.getIp()+":"+dataSource.getPort();
String databaseType = dataSource.getDatabaseType();
if(databaseType.contains("mysql")){
url+="/"+dataSource.getDatabases();
if(StringUtils.isNotEmpty(dataSource.getConnectionParameter())){
url+="?"+dataSource.getConnectionParameter();
}
}else if(databaseType.contains("sqlserver")){
url+=";DatabaseName="+dataSource.getDatabases()+";";
if(StringUtils.isNotEmpty(dataSource.getConnectionParameter())){
url+=dataSource.getConnectionParameter()+";";
}
}
return url;
}
/**
* druid,JDBC
* @param dataSource
*/
public static Connection getConnection(DataSource dataSource) throws SQLException {
//设置驱动
try {
Class.forName(confirmDriverType(dataSource.getDatabaseType()));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
//设置连接超时
DriverManager.setLoginTimeout(3);
//获取连接
return DriverManager.getConnection(confirmUrl(dataSource), dataSource.getUsername(), dataSource.getPassword());
}
/**
* Druid
* @param dataSource
* @return Druid
*/
public static DruidDataSource createDataSource(DataSource dataSource) {
//手动创建数据源
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setName(dataSource.getId().toString());
druidDataSource.setDriverClassName(confirmDriverType(dataSource.getDatabaseType()));
druidDataSource.setUrl(confirmUrl(dataSource));
druidDataSource.setUsername(dataSource.getUsername());
druidDataSource.setPassword(dataSource.getPassword());
//设置连接配置
if(StringUtils.isNotEmpty(dataSource.getConnectionConfig())){
String[] configs = dataSource.getConnectionConfig().split(",");
for (String config : configs) {
String[] split = config.split("=");
if(config.contains("initNum")){
//初始化连接池大小
druidDataSource.setInitialSize(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])? Convert.toInt(split[1]):5);
}else if(config.contains("maxNum")){
//最大连接数
druidDataSource.setMaxActive(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):20);
}else if(config.contains("minIdle")){
//最小空闲数
druidDataSource.setMinIdle(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5);
}else {
//获取连接最大等待时间,单位毫秒
druidDataSource.setMaxWait(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):4000);
}
}
}
String validationQuery = "select 1";
//申请连接时执行validationQuery检测连接是否有效防止取到的连接不可用
druidDataSource.setTestOnBorrow(true);
druidDataSource.setValidationQuery(validationQuery);
//防止DruidDataSource一直尝试获取连接,导致服务卡死
druidDataSource.setConnectionErrorRetryAttempts(0);
druidDataSource.setBreakAfterAcquireFailure(true);
//设置连接超时时间
druidDataSource.setConnectTimeout(2000);
try {
druidDataSource.init();
} catch (SQLException e) {
throw new RuntimeException(e);
}
return druidDataSource;
}
}

View File

@ -21,12 +21,18 @@
</description>
<dependencies>
<!-- ruoyi-etl-common etl公共模块 -->
<!-- ruoyi-data_asset-common 数据资产公共模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>ruoyi-data_asset-common</artifactId>
<version>3.6.3</version>
</dependency>
<!-- ruoyi-data_asset-client 数据资产客户端模块 -->
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-data_asset-client</artifactId>
<version>3.6.3</version>
</dependency>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>

View File

@ -4,6 +4,9 @@ import java.sql.SQLException;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.dataAsset.service.AssetModelDataService;
import com.ruoyi.dataAsset.service.ColumnInfoService;
import com.ruoyi.dataAsset.service.TableInfoService;
import io.swagger.annotations.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;

View File

@ -7,16 +7,17 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import com.alibaba.druid.pool.DruidDataSource;
import com.ruoyi.common.core.exception.ServiceException;
import com.ruoyi.common.core.text.Convert;
import com.ruoyi.common.core.utils.ObjUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.dataAsset.config.DruidDataSourceFactory;
import com.ruoyi.dataAsset.domain.AssetModelData;
import com.ruoyi.dataAsset.domain.ColumnInfo;
import com.ruoyi.dataAsset.domain.TableInfo;
import com.ruoyi.dataAsset.service.AssetModelDataService;
import com.ruoyi.dataAsset.service.ColumnInfoService;
import com.ruoyi.dataAsset.service.TableInfoService;
import com.ruoyi.dataAsset.util.DataSourceUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -46,6 +47,9 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
@Autowired
private AssetModelDataService assetModuleDataService;
@Autowired
private DruidDataSourceFactory druidDataSourceFactory;
/**
*
*
@ -83,47 +87,10 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
@Override
public void testConnect(Long id){
DataSource dataSource = this.getById(id);
DruidDataSource druidDataSource = this.createDataSource(dataSource);
//如果为不可用修改为可用
if(dataSource.getStatus().equals("N")){
dataSource.setStatus("Y");
dataSource.setUpdateTime(new Date());
dataSource.setUpdateBy(SecurityUtils.getUsername());
this.updateById(dataSource);
}
druidDataSource.close();
}
/**
* Druid
* @param dataSource
* @return Druid
*/
public DruidDataSource createDataSource(DataSource dataSource) {
DruidDataSource druidDataSource = null;
String driverType=null;
String url="jdbc:"+dataSource.getDatabaseType()+dataSource.getIp()+":"+dataSource.getPort();
if(dataSource.getDatabaseType().contains("mysql")){
driverType="com.mysql.cj.jdbc.Driver";
url+="/"+dataSource.getDatabases();
}else if(dataSource.getDatabaseType().contains("sqlserver")){
driverType="com.microsoft.sqlserver.jdbc.SQLServerDriver";
url+=";DatabaseName="+dataSource.getDatabases()+";";
}
if(StringUtils.isNotEmpty(dataSource.getConnectionParameter())){
if(dataSource.getDatabaseType().contains("mysql")){
url+="?"+dataSource.getConnectionParameter();
}else {
url+=dataSource.getConnectionParameter()+";";
}
}
//druid数据库连接池连接失败的情况下问题较多,需要先用JDBC测试一下可用性
Connection connection=null;
try {
Class.forName(driverType);
DriverManager.setLoginTimeout(3);
connection = DriverManager.getConnection(url, dataSource.getUsername(), dataSource.getPassword());
} catch (Exception e) {
connection = DataSourceUtil.getConnection(dataSource);
} catch (SQLException e) {
//连接失败,如果为可用修改为不可用
if(dataSource.getStatus().equals("Y")){
dataSource.setStatus("N");
@ -141,47 +108,13 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
throw new RuntimeException(e);
}
}
try {
//手动创建数据源
druidDataSource = new DruidDataSource();
druidDataSource.setName(dataSource.getId().toString());
druidDataSource.setDriverClassName(driverType);
druidDataSource.setUrl(url);
druidDataSource.setUsername(dataSource.getUsername());
druidDataSource.setPassword(dataSource.getPassword());
//设置连接配置
if(StringUtils.isNotEmpty(dataSource.getConnectionConfig())){
String[] configs = dataSource.getConnectionConfig().split(",");
for (String config : configs) {
String[] split = config.split("=");
if(config.contains("initNum")){
//初始化连接池大小
druidDataSource.setInitialSize(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5);
}else if(config.contains("maxNum")){
//最大连接数
druidDataSource.setMaxActive(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):20);
}else if(config.contains("minIdle")){
//最小空闲数
druidDataSource.setMinIdle(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):5);
}else {
//获取连接最大等待时间,单位毫秒
druidDataSource.setMaxWait(split.length>1&&!split[1].equals("null")&&StringUtils.isNotEmpty(split[1])?Convert.toInt(split[1]):4000);
}
}
}
String validationQuery = "select 1";
//申请连接时执行validationQuery检测连接是否有效防止取到的连接不可用
druidDataSource.setTestOnBorrow(true);
druidDataSource.setValidationQuery(validationQuery);
//防止DruidDataSource一直尝试获取连接,导致服务卡死
druidDataSource.setConnectionErrorRetryAttempts(0);
druidDataSource.setBreakAfterAcquireFailure(true);
druidDataSource.setConnectTimeout(2000);
druidDataSource.init();
} catch (Exception e) {
throw new RuntimeException(e);
//如果为不可用修改为可用
if(dataSource.getStatus().equals("N")){
dataSource.setStatus("Y");
dataSource.setUpdateTime(new Date());
dataSource.setUpdateBy(SecurityUtils.getUsername());
this.updateById(dataSource);
}
return druidDataSource;
}
/**
@ -204,7 +137,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
"INNER JOIN sysindexes AS b ON a.id = b.id WHERE ( a.type = 'u' ) AND ( b.indid IN ( 0, 1 ) );";
}
try {
DruidDataSource druidDatasource = this.createDataSource(dataSource);
DruidDataSource druidDatasource = druidDataSourceFactory.getMap().get(dataSourceId);
//总的记录数,总的表数量
ResultSet result = druidDatasource.getConnection().prepareStatement(sql).executeQuery();
while (result.next()){

View File

@ -0,0 +1,19 @@
package com.ruoyi.dataTransform.controller;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
* @ClassName DataExtractController
* @Author
* @Date 2024/5/9 10:56
*/
@Api(tags = "数据抽取")
@RestController
@RequestMapping("/extract")
public class DataExtractController {
}

View File

@ -1,29 +0,0 @@
package com.ruoyi.dataTransform.controller;
import com.alibaba.druid.pool.DruidDataSource;
import com.ruoyi.common.core.domain.Result;
import com.ruoyi.dataAsset.config.DruidDataSourceFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Map;
/**
*
* @ClassName DataSourceTestController
* @Author
* @Date 2024/5/9 10:56
*/
@RestController
public class DataSourceTestController {
@Resource
private DruidDataSourceFactory druidDataSourceFactory;
@GetMapping("/test")
public Result<Object> test(){
Map<Long, DruidDataSource> map = druidDataSourceFactory.getMap();
System.out.println(map);
return Result.success();
}
}

View File

@ -0,0 +1,12 @@
package com.ruoyi.dataTransform.service;
/**
* service
* @ClassName DataExtractService
* @Author
* @Date 2024/5/9 10:56
*/
public interface DataExtractService {
}

View File

@ -0,0 +1,18 @@
package com.ruoyi.dataTransform.service.impl;
import com.ruoyi.dataTransform.service.DataExtractService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* Service
*
* @author gtl
* @date 2024-04-20
*/
@Slf4j
@Service
public class DataExtractServiceImpl implements DataExtractService {
}

View File

@ -1,8 +1,10 @@
package com.ruoyi.ruleEngine.context;
import com.ruoyi.ruleEngine.model.process.DataModelProcessModel;
import com.ruoyi.ruleEngine.util.ScopeContextHolderUtil;
import lombok.Data;
import lombok.experimental.SuperBuilder;
import java.sql.Connection;
/**
*
@ -31,6 +33,10 @@ public class DataModelContextHolder{
THREAD_LOCAL.set(dataModelProcessModel);
}
public static void set(Connection connection,String sql){
ScopeContextHolderUtil.setDataSetContextHolder(connection,sql);
}
public static void remove() {
THREAD_LOCAL.remove();
}

View File

@ -1,21 +0,0 @@
package com.ruoyi.ruleEngine.engine.custom;
import com.ruoyi.ruleEngine.engine.action.ActionDiscard;
import com.ruoyi.ruleEngine.engine.scope.DataModelEngine;
/**
*
* @ClassName IsNotNullEngineCustom
* @Author:
* @Date: 2024/5/6 13:48
*/
public class IsNotNullEngineCustom extends DataModelEngine {
@Override
public void execution () {
Object value = getValue();
if (value == null || "".equals(value) || "null".equals(value)) {
throw new ActionDiscard();
}
}
}

View File

@ -18,11 +18,11 @@ import java.util.List;
public class ScopeContextHolderUtil {
public static void setDataSetContextHolder(Connection connection){
public static void setDataSetContextHolder(Connection connection,String sql){
List<RecordModel> recordModels=new ArrayList<>();
try {
PreparedStatement preparedStatement = connection.prepareStatement("select * from table_info limit 0,10");
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
List<RecordModel> recordModels=new ArrayList<>();
while (resultSet.next()){
int index=1;
ResultSetMetaData data = resultSet.getMetaData();
@ -43,13 +43,13 @@ public class ScopeContextHolderUtil {
recordModels.add(recordModel);
}
connection.close();
DataSetModel dataSetModel = new DataSetModel(recordModels);
DataSetProcessModel dataSetProcessModel = new DataSetProcessModel(dataSetModel);
//存入本地线程
DataSetContextHolder.set(dataSetProcessModel);
} catch (Exception e) {
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
DataSetModel dataSetModel = new DataSetModel(recordModels);
DataSetProcessModel dataSetProcessModel = new DataSetProcessModel(dataSetModel);
//存入本地线程
DataSetContextHolder.set(dataSetProcessModel);
}
}