refactor:数据接入(优化)

master_fei
Yunfei Du 2024-05-09 21:04:04 +08:00
parent 4f53431f7b
commit 006ecfdc92
11 changed files with 218 additions and 280 deletions

View File

@ -0,0 +1,19 @@
package com.etl.data.client.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import org.springframework.http.converter.json.GsonBuilderUtils;
import org.springframework.stereotype.Component;
/**
*
* @author YunFei.Du
* @date 21:34 2024/5/8
*/
@ComponentScan
@Import ( value = {DataAccessClientRunner.class})
public class DataAccessClientConfig {
public static void main(String[] args) {
System.out.println ("连接池 编码" );
}
}

View File

@ -0,0 +1,22 @@
package com.etl.data.client.config;
import feign.Param;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
/**
* @ClassName DataAccessClientRunner
* @Description
* @Author YunFei.Du
* @Date 2024/5/8 21:35
*/
public class DataAccessClientRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
}
}

View File

@ -33,13 +33,13 @@ public class DataAsset extends BaseEntity {
private Long dataSourceId;
/** 表名称 */
private String tableName;
private String name;
/** 表备注 */
private String tableComment;
private String nameAs;
/** 数据量 */
private Integer tableCount;
private Integer dataTotal;
/** 资产模型 */
private Integer fields;

View File

@ -25,81 +25,74 @@ import lombok.experimental.SuperBuilder;
@TableName("data_source")
public class DataSource extends BaseEntity {
private static final long serialVersionUID = 1L;
/**
* id
*
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/** 数据来源名称 */
/**
*
*/
private String name;
/**
*
*/
private String systemName;
/**
*
*
*/
private String dataSourceName;
/** 连接驱动名称 */
private String jdbcDriver;
private String type;
/**
*
*
*/
private String databaseName;
/** 模式名称 (postgres)*/
private String modeName;
private String host;
/**
* ip
*/
private String linkAddress;
/**
*
* ip
*/
private String port;
/**
*
*
*/
private String databaseName;
/**
*
*/
private String username;
/**
*
*/
private String password;
/** 数据连接参数 */
/**
*
*/
private String connectionParam;
/** 初始连接数量 */
private Integer initNum;
/** 最大连接数量 */
private Integer maxNum;
/** 最大等待时间 */
private Integer maxWaitTime;
/** 最大等待次数 */
private Integer maxWaitSize;
/**
*
*
*/
private String type;
private String initNum;
/**
*
*/
private String maxNum;
/**
*
*/
private String maxWaitTime;
/**
*
*/
private String maxWaitSize;
/**
*
*
*/
@TableField(exist = false)
private Integer count;
private String jdbcDriver;
/**
*
*/
private String modeName;
}

View File

@ -0,0 +1,56 @@
package com.etl.data.domain.decoration;
import com.etl.data.domain.DataSource;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* @ClassName DataSourceDecoration
* @Description
* @Author YunFei.Du
* @Date 2024/5/9 8:53
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class DataSourceDecoration {
/**
* id
*/
private Long id;
/**
*
*/
private String systemName;
/**
*
*/
private String name;
/** 连接驱动名称 */
private String jdbcDriver;
/**
*
*/
private String databaseName;
private String type;
public static DataSourceDecoration dataSourceBuild(DataSource dataSource) {
return DataSourceDecoration.builder ()
.id(dataSource.getId ())
.name(dataSource.getName ())
.systemName(dataSource.getSystemName ())
.databaseName(dataSource.getDatabaseName ())
.type("dataSource")
.build ();
}
}

View File

@ -15,14 +15,14 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
@SuperBuilder
public class DataSourceQueryReq {
private Integer pageNum;
private Integer pageNum=1;
private Integer pageSize;
private Integer pageSize=10;
/**
*
*/
private String dataSourceName;
private String name;
/**
*
@ -34,5 +34,4 @@ public class DataSourceQueryReq {
*/
private String databaseName;
private String linkAddress;
}

View File

@ -1,127 +0,0 @@
package com.etl.data.domain.resp;
import com.etl.common.core.web.domain.BaseEntity;
import com.etl.data.domain.DataSource;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author YunFei.Du
* @date 14:56 2024/5/1
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class DataSourceResp extends BaseEntity {
/**
* id
*/
private Long id;
/** 数据来源名称 */
private String systemName;
/**
*
*/
private String dataSourceName;
/** 连接驱动名称 */
private String jdbcDriver;
/**
*
*/
private String databaseName;
/** 模式名称 (postgres)*/
private String modeName;
/**
* ip
*/
private String linkAddress;
/**
*
*/
private String port;
/**
*
*/
private String username;
/**
*
*/
private String password;
/** 数据连接参数 */
private String connectionParam;
/** 初始连接数量 */
private Integer initNum;
/** 最大连接数量 */
private Integer maxNum;
/** 最大等待时间 */
private Integer maxWaitTime;
/** 最大等待次数 */
private Integer maxWaitSize;
/**
*
*/
private String type;
/**
*
*/
private Integer count;
/**
*
*/
private List<DataSourceSpliceParam> dataSourceSpliceParam;
/**
* (+)
* @param dataSource
* @param dataSourceSpliceParams
* @return
*/
public static DataSourceResp builderDataSource(DataSource dataSource, ArrayList< DataSourceSpliceParam > dataSourceSpliceParams) {
return DataSourceResp.builder ()
.id(dataSource.getId())
.systemName(dataSource.getSystemName())
.dataSourceName(dataSource.getDataSourceName())
.jdbcDriver(dataSource.getJdbcDriver())
.databaseName(dataSource.getDatabaseName())
.linkAddress(dataSource.getLinkAddress())
.port(dataSource.getPort())
.username(dataSource.getUsername())
.password(dataSource.getPassword ())
.connectionParam(dataSource.getConnectionParam ())
.initNum(dataSource.getInitNum ())
.maxNum(dataSource.getMaxNum ())
.count(dataSource.getCount ())
.maxWaitTime(dataSource.getMaxWaitTime ())
.maxWaitSize(dataSource.getMaxWaitSize ())
.type(dataSource.getType ())
.modeName(dataSource.getModeName ())
.dataSourceSpliceParam(dataSourceSpliceParams)
.build ();
}
}

View File

@ -1,7 +1,10 @@
package com.etl.data.controller;
import com.etl.common.core.domain.Result;
import com.etl.data.service.DataAssetService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

View File

@ -3,22 +3,18 @@ package com.etl.data.controller;
import com.etl.common.core.domain.Result;
import com.etl.common.core.web.controller.BaseController;
import com.etl.common.core.web.page.TableDataInfo;
import com.etl.common.security.utils.SecurityUtils;
import com.etl.data.domain.DataAsset;
import com.etl.data.domain.DataSource;
import com.etl.data.domain.decoration.DataSourceDecoration;
import com.etl.data.domain.req.DataSourceQueryReq;
import com.etl.data.domain.resp.DataSourceResp;
import com.etl.data.service.DataSourceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.xml.crypto.Data;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
/**
*
* dataAccess
* @author YunFei.Du
* @date 21:47 2024/4/25
*/
@ -34,24 +30,18 @@ public class DataSourceController extends BaseController {
*/
@PostMapping("/list")
public Result< TableDataInfo<DataSource> > getDataSourceList (@RequestBody DataSourceQueryReq req) {
startPage();
List<DataSource> list = dataSourceService.selectDataSourceList(req);
return getDataAsset (list);
}
@GetMapping("/getList")
public List< DataSource > getList () {
List<DataSource> list = dataSourceService.getList();
public List< DataSourceDecoration > getList () {
List<DataSourceDecoration> list = dataSourceService.getList();
return list;
}
/**
*
*/
@GetMapping("getDataSourceById")
public Result getDataSourceById(@RequestParam("id") Long id) {
DataSource dataSource = dataSourceService.getById(id);
return Result.success(dataSource);
}
/**
* (/)
@ -62,7 +52,14 @@ public class DataSourceController extends BaseController {
public Result editDataSource(@RequestBody DataSource dataSource){
return dataSourceService.editDataSource(dataSource);
}
/**
*
*/
@GetMapping("getDataSourceById")
public Result getDataSourceById(@RequestParam("id") Long id) {
DataSource dataSource = dataSourceService.getById(id);
return Result.success(dataSource);
}
/**
*
* @param id

View File

@ -3,9 +3,9 @@ package com.etl.data.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.etl.common.core.domain.Result;
import com.etl.data.domain.DataSource;
import com.etl.data.domain.decoration.DataSourceDecoration;
import com.etl.data.domain.req.DataSourceQueryReq;
import com.etl.data.domain.resp.DataSourceResp;
import org.springframework.web.bind.annotation.RequestBody;
import java.sql.Connection;
import java.util.List;
@ -17,7 +17,7 @@ import java.util.List;
*/
public interface DataSourceService extends IService<DataSource> {
List<DataSource> getList();
List< DataSourceDecoration > getList();
/**
*

View File

@ -11,21 +11,20 @@ import com.etl.common.system.domain.SysRole;
import com.etl.data.domain.*;
import com.etl.data.domain.Dictionary;
import com.etl.data.domain.custom.Statistics;
import com.etl.data.domain.decoration.DataSourceDecoration;
import com.etl.data.domain.req.DataSourceQueryReq;
import com.etl.data.domain.resp.DataSourceResp;
import com.etl.data.domain.resp.DataSourceSpliceParam;
import com.etl.data.mapper.DataSourceMapper;
import com.etl.data.service.*;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.stream.Collectors;
/**
@ -52,57 +51,50 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
@Override
public List< DataSource > getList() {
public List< DataSourceDecoration > getList() {
List< DataSource > list = this.list ( );
return list;
List< DataSourceDecoration > collect = list.stream ( ).map (DataSourceDecoration::dataSourceBuild ).toList ( );
// list.stream ( ).map ( DataSourceDecoration::dataSourceBuild).collect ( Collectors.toList ( ) );
return collect;
}
@Override
public List< DataSource > selectDataSourceList(DataSourceQueryReq req) {
// 创建查询包装器,用于构建数据库查询条件
LambdaQueryWrapper< DataSource > queryWrapper = new LambdaQueryWrapper<> ( );
List< DataSource > dataSources = this.list ( );
// 根据请求对象中的数据源名称、系统名称和数据库名称添加查询条件
if (req.getDataSourceName ()!=null && req.getDatabaseName ()!=""){
queryWrapper.eq ( DataSource::getDataSourceName, req.getDataSourceName () );
}
if (req.getSystemName ()!=null && req.getSystemName ()!=""){
queryWrapper.eq ( DataSource::getSystemName, req.getSystemName () );
}
if (req.getDatabaseName ()!=null && req.getDatabaseName ()!=""){
queryWrapper.eq ( DataSource::getDataSourceName, req.getDatabaseName () );
}
List< DataSource > dataSources = this.list ( queryWrapper );
List<DataSource> dataSourceList = new ArrayList<DataSource>();
List<SysRole> roles = SecurityUtils.getLoginUser().getSysUser().getRoles();
//判断登录人是否为管理员,不是则需要过滤掉未授权的信息
if (roles.get(0).getRoleId()==1){
dataSourceList = dataSources;
}else{
//数据接入过滤完的主键id集合
ArrayList<Long> longs1 = new ArrayList<>();
//已授权的数据接入id集合
List< Long > sourceIds = sourceAccreditService.list ( new LambdaQueryWrapper< SourceAccredit > ( ).eq ( SourceAccredit::getUserId, SecurityUtils.getUserId ( ) ) ).stream ( ).map ( SourceAccredit::getDataSourceId ).filter ( Objects::nonNull ).toList ( );
//已授权数据模型表的主键id集合
List< Long > assetIds = assetAccreditService.list ( new LambdaQueryWrapper< AssetAccredit > ( ).eq ( AssetAccredit::getUserId, SecurityUtils.getUserId ( ) ) ).stream ( ).map ( AssetAccredit::getDataAssetId ).filter ( Objects::nonNull ).toList ( );
//判断已授权数据模型表的主键id集合是否为空
if (!assetIds.isEmpty()){
//获取已授权的数据模型表的数据源id集合
List< Long > longs = dataAssetService.list ( new LambdaQueryWrapper< DataAsset > ( ).eq ( DataAsset::getDataSourceId, assetIds ) ).stream ( ).map ( DataAsset::getDataSourceId ).toList ( );
//添加到已授权的数据源id集合中
longs1.addAll(longs);
}
longs1.addAll(sourceIds);
//从所有的数据源信息集合中过滤掉未授权的数据源信息
dataSourceList = dataSources.stream().filter(dataSourceInfo -> longs1.contains(dataSourceInfo.getId())).toList();
}
return dataSourceList;
// List<DataSource> dataSourceList = new ArrayList<DataSource>();
// List<SysRole> roles = SecurityUtils.getLoginUser().getSysUser().getRoles();
// //判断登录人是否为管理员,不是则需要过滤掉未授权的信息
// if (roles.get(0).getRoleId()==1){
// dataSourceList = dataSources;
// }else{
// //数据接入过滤完的主键id集合
// ArrayList<Long> longs1 = new ArrayList<>();
// //已授权的数据接入id集合
//
// List< Long > sourceIds = sourceAccreditService.list ( new LambdaQueryWrapper< SourceAccredit > ( ).eq ( SourceAccredit::getUserId, SecurityUtils.getUserId ( ) ) ).stream ( ).map ( SourceAccredit::getDataSourceId ).filter ( Objects::nonNull ).toList ( );
//
//
// //已授权数据模型表的主键id集合
// List< Long > assetIds = assetAccreditService.list ( new LambdaQueryWrapper< AssetAccredit > ( ).eq ( AssetAccredit::getUserId, SecurityUtils.getUserId ( ) ) ).stream ( ).map ( AssetAccredit::getDataAssetId ).filter ( Objects::nonNull ).toList ( );
//
// //判断已授权数据模型表的主键id集合是否为空
// if (!assetIds.isEmpty()){
// //获取已授权的数据模型表的数据源id集合
// List< Long > longs = dataAssetService.list ( new LambdaQueryWrapper< DataAsset > ( ).eq ( DataAsset::getDataSourceId, assetIds ) ).stream ( ).map ( DataAsset::getDataSourceId ).toList ( );
//
// //添加到已授权的数据源id集合中
// longs1.addAll(longs);
// }
// longs1.addAll(sourceIds);
// //从所有的数据源信息集合中过滤掉未授权的数据源信息
// dataSourceList = dataSources.stream().filter(dataSourceInfo -> longs1.contains(dataSourceInfo.getId())).toList();
// }
return dataSources;
// 初始化数据源响应列表
// List< DataSourceResp > dataSourceRespArrayList = new ArrayList<> ( );
//
@ -174,13 +166,13 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
Class.forName ( dataSource.getJdbcDriver () );
switch (dataSource.getType ()){
case "oracle":
jdbcUrl="jdbc:oracle:thin:@"+dataSource.getLinkAddress ()+":"+dataSource.getPort ()+":"+dataSource.getDatabaseName ();
jdbcUrl="jdbc:oracle:thin:@"+dataSource.getHost ()+":"+dataSource.getPort ()+":"+dataSource.getDatabaseName ();
break;
case "sqlserver":
jdbcUrl="jdbc:sqlserver://"+dataSource.getLinkAddress ()+":"+dataSource.getPort ()+";databaseName="+dataSource.getDatabaseName ();
jdbcUrl="jdbc:sqlserver://"+dataSource.getHost ()+":"+dataSource.getPort ()+";databaseName="+dataSource.getDatabaseName ();
break;
default:
jdbcUrl="jdbc:mysql://"+dataSource.getLinkAddress ()+":"+dataSource.getPort ()+"/"+dataSource.getDatabaseName ()+"?"+dataSource.getConnectionParam ();
jdbcUrl="jdbc:mysql://"+dataSource.getHost ()+":"+dataSource.getPort ()+"/"+dataSource.getDatabaseName ()+"?"+dataSource.getConnectionParam ();
}
Connection conn = DriverManager.getConnection ( jdbcUrl, dataSource.getUsername ( ), dataSource.getPassword ( ) );
// 释放资源
@ -203,22 +195,6 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
if (aBoolean.equals ( null )){
return Result.error ("测试失败!!!");
}
// 根据数据源id'获取所有表
// List< DataAsset > DataAssetList = dataAssetService.list ( new LambdaQueryWrapper< DataAsset > ( ).eq ( DataAsset::getDataSourceId, dataSource.getId ( ) ) );
// List< Long > DataAssetIds = DataAssetList.stream ( ).map ( DataAsset::getId ).toList ( );
//// 如何id存在删除
// if (DataAssetIds!=null && DataAssetIds.size ()!=0){
// //获取所有的数据模型记录
// List< AssetModel > assetModelList = assetModelService.list ( new LambdaQueryWrapper< AssetModel > ( ).in ( AssetModel::getDataAssetId, DataAssetIds ) );
// List< Long > assetModelIds = assetModelList.stream ( ).map ( AssetModel::getId ).toList ( );
// //删除所有源表
// dataAssetService.removeBatchByIds ( DataAssetIds );
//
// if (assetModelIds!=null &&assetModelIds.size ()!=0){
// //删除所有模型
// assetModelService.removeBatchByIds ( assetModelIds );
// }
// }
// 查询结构
if (dataSource.getType ().equals ( "MySql" )){
// 通过数据库名获取表结构(表名,表行,表模型(列))
@ -234,7 +210,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
"\t\t";
jdbcUrl="jdbc:mysql://"+dataSource.getLinkAddress ()+":"+dataSource.getPort ()+"/"+dataSource.getDatabaseName ()+"?"+dataSource.getConnectionParam ();
jdbcUrl="jdbc:mysql://"+dataSource.getHost ()+":"+dataSource.getPort ()+"/"+dataSource.getDatabaseName ()+"?"+dataSource.getConnectionParam ();
}
try {
Class.forName ( dataSource.getJdbcDriver () );
@ -243,15 +219,15 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
ResultSet resultSet = conn.prepareStatement ( sql ).executeQuery ( );
while (resultSet.next ()){
DataAsset dataAsset = DataAsset.builder ( ).tableName ( resultSet.getString ( "table_name" ) )
DataAsset dataAsset = DataAsset.builder ( ).name ( resultSet.getString ( "table_name" ) )
.dataSourceId ( dataSource.getId ( ) )
.fields ( Integer.valueOf ( resultSet.getString ( "fields" ) ) )
.tableCount ( Integer.valueOf ( resultSet.getString ( "table_rows" ) ) )
.tableComment ( resultSet.getString ( "table_comment" ) )
.dataTotal ( Integer.valueOf ( resultSet.getString ( "table_rows" ) ) )
.remark ( resultSet.getString ( "table_comment" ) )
.build ( );
dataAssetService.saveOrUpdate ( dataAsset,new LambdaUpdateWrapper<DataAsset> ( ){{
eq(DataAsset::getDataSourceId,dataAsset.getDataSourceId ());
eq(DataAsset::getTableName,dataAsset.getTableName ());
eq(DataAsset::getName,dataAsset.getName ());
}} );
}
List< DataAsset > dataAssets = dataAssetService.list ( new LambdaQueryWrapper< DataAsset > ( ).eq (
@ -263,11 +239,11 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
// / 检查数据资产是否有字段(即列)
if (dataAsset.getFields ()!=0){
// 获取表的所有列的元数据
ResultSetMetaData data = conn.prepareStatement ( "SELECT * FROM " + dataAsset.getTableName ( ) ).getMetaData ( );
ResultSetMetaData data = conn.prepareStatement ( "SELECT * FROM " + dataAsset.getName () ).getMetaData ( );
// 获取表的所有列的信息
ResultSet assetsInfo = conn.getMetaData ( ).getColumns ( dataSource.getDatabaseName (), "%", dataAsset.getTableName (), "%" );
ResultSet assetsInfo = conn.getMetaData ( ).getColumns ( dataSource.getDatabaseName (), "%", dataAsset.getName (), "%" );
// 获取表的主键信息
ResultSet primaryKeys =conn.getMetaData ( ).getPrimaryKeys ( dataSource.getDatabaseName ( ), "%", dataAsset.getTableName ( ) );
ResultSet primaryKeys =conn.getMetaData ( ).getPrimaryKeys ( dataSource.getDatabaseName ( ), "%", dataAsset.getName () );
// 获取主键列名
String primaryKeyName="";
@ -405,7 +381,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
long sum1;
long sum2=0;
sum1 = dataAssetList.stream().mapToLong(dataAsset -> Long.valueOf(dataAsset.getFields())).sum();
sum2 = dataAssetList.stream().mapToLong(dataAsset -> Long.valueOf(dataAsset.getTableCount())).sum();
sum2 = dataAssetList.stream().mapToLong(dataAsset -> Long.valueOf(dataAsset.getDataTotal ())).sum();
statistics.setAssetModel(sum1);
statistics.setDataModel(sum2);
}else{