refactor:同步数据由全量改成增量

master_fei
Yunfei Du 2024-05-08 20:38:55 +08:00
parent 912cdbdd49
commit 4f53431f7b
6 changed files with 64 additions and 31 deletions

View File

@ -12,6 +12,7 @@ import com.etl.data.service.DataSourceService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.xml.crypto.Data;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -37,6 +38,12 @@ public class DataSourceController extends BaseController {
return getDataAsset (list); return getDataAsset (list);
} }
@GetMapping("/getList")
public List< DataSource > getList () {
List<DataSource> list = dataSourceService.getList();
return list;
}
/** /**
* *
*/ */

View File

@ -17,6 +17,8 @@ import java.util.List;
*/ */
public interface DataSourceService extends IService<DataSource> { public interface DataSourceService extends IService<DataSource> {
List<DataSource> getList();
/** /**
* *
* @param dataSource * @param dataSource

View File

@ -1,6 +1,7 @@
package com.etl.data.service.impl; package com.etl.data.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.etl.common.core.domain.Result; import com.etl.common.core.domain.Result;
import com.etl.common.core.utils.StringUtils; import com.etl.common.core.utils.StringUtils;
@ -50,6 +51,11 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
private SourceAccreditService sourceAccreditService; private SourceAccreditService sourceAccreditService;
@Override
public List< DataSource > getList() {
List< DataSource > list = this.list ( );
return list;
}
@Override @Override
public List< DataSource > selectDataSourceList(DataSourceQueryReq req) { public List< DataSource > selectDataSourceList(DataSourceQueryReq req) {
@ -198,25 +204,36 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
return Result.error ("测试失败!!!"); return Result.error ("测试失败!!!");
} }
// 根据数据源id'获取所有表 // 根据数据源id'获取所有表
List< DataAsset > DataAssetList = dataAssetService.list ( new LambdaQueryWrapper< DataAsset > ( ).eq ( DataAsset::getDataSourceId, dataSource.getId ( ) ) ); // List< DataAsset > DataAssetList = dataAssetService.list ( new LambdaQueryWrapper< DataAsset > ( ).eq ( DataAsset::getDataSourceId, dataSource.getId ( ) ) );
List< Long > DataAssetIds = DataAssetList.stream ( ).map ( DataAsset::getId ).toList ( ); // List< Long > DataAssetIds = DataAssetList.stream ( ).map ( DataAsset::getId ).toList ( );
// 如何id存在删除 //// 如何id存在删除
if (DataAssetIds!=null && DataAssetIds.size ()!=0){ // if (DataAssetIds!=null && DataAssetIds.size ()!=0){
//获取所有的数据模型记录 // //获取所有的数据模型记录
List< AssetModel > assetModelList = assetModelService.list ( new LambdaQueryWrapper< AssetModel > ( ).in ( AssetModel::getDataAssetId, DataAssetIds ) ); // List< AssetModel > assetModelList = assetModelService.list ( new LambdaQueryWrapper< AssetModel > ( ).in ( AssetModel::getDataAssetId, DataAssetIds ) );
List< Long > assetModelIds = assetModelList.stream ( ).map ( AssetModel::getId ).toList ( ); // List< Long > assetModelIds = assetModelList.stream ( ).map ( AssetModel::getId ).toList ( );
//删除所有源表 // //删除所有源表
dataAssetService.removeBatchByIds ( DataAssetIds ); // dataAssetService.removeBatchByIds ( DataAssetIds );
//
if (assetModelIds!=null &&assetModelIds.size ()!=0){ // if (assetModelIds!=null &&assetModelIds.size ()!=0){
//删除所有模型 // //删除所有模型
assetModelService.removeBatchByIds ( assetModelIds ); // assetModelService.removeBatchByIds ( assetModelIds );
} // }
} // }
// 查询结构 // 查询结构
if (dataSource.getType ().equals ( "MySql" )){ if (dataSource.getType ().equals ( "MySql" )){
// 通过数据库名获取表结构(表名,表行,表模型(列)) // 通过数据库名获取表结构(表名,表行,表模型(列))
sql="SELECT TABLE_NAME table_name,TABLE_COMMENT table_comment,TABLE_ROWS table_rows,(SELECT count(*) FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' and TABLE_NAME=table_name) fields FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='"+dataSource.getDatabaseName()+"'"; sql="SELECT \n" +
" T.table_name,T.table_comment,T.table_rows, \n" +
" (SELECT COUNT(*) \n" +
" FROM INFORMATION_SCHEMA.COLUMNS C \n" +
" WHERE C.TABLE_NAME = T.table_name AND C.TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"') AS fields\n" +
"FROM \n" +
" INFORMATION_SCHEMA.TABLES T\n" +
"WHERE \n" +
" T.TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"'\n" +
"\t\t";
jdbcUrl="jdbc:mysql://"+dataSource.getLinkAddress ()+":"+dataSource.getPort ()+"/"+dataSource.getDatabaseName ()+"?"+dataSource.getConnectionParam (); jdbcUrl="jdbc:mysql://"+dataSource.getLinkAddress ()+":"+dataSource.getPort ()+"/"+dataSource.getDatabaseName ()+"?"+dataSource.getConnectionParam ();
} }
try { try {
@ -224,7 +241,6 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
conn=DriverManager.getConnection ( jdbcUrl, dataSource.getUsername ( ), dataSource.getPassword ( ) ); conn=DriverManager.getConnection ( jdbcUrl, dataSource.getUsername ( ), dataSource.getPassword ( ) );
ResultSet resultSet = conn.prepareStatement ( sql ).executeQuery ( ); ResultSet resultSet = conn.prepareStatement ( sql ).executeQuery ( );
ArrayList< DataAsset > dataAssets = new ArrayList<> ( );
while (resultSet.next ()){ while (resultSet.next ()){
DataAsset dataAsset = DataAsset.builder ( ).tableName ( resultSet.getString ( "table_name" ) ) DataAsset dataAsset = DataAsset.builder ( ).tableName ( resultSet.getString ( "table_name" ) )
@ -233,13 +249,16 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
.tableCount ( Integer.valueOf ( resultSet.getString ( "table_rows" ) ) ) .tableCount ( Integer.valueOf ( resultSet.getString ( "table_rows" ) ) )
.tableComment ( resultSet.getString ( "table_comment" ) ) .tableComment ( resultSet.getString ( "table_comment" ) )
.build ( ); .build ( );
dataAssets.add ( dataAsset ); dataAssetService.saveOrUpdate ( dataAsset,new LambdaUpdateWrapper<DataAsset> ( ){{
eq(DataAsset::getDataSourceId,dataAsset.getDataSourceId ());
eq(DataAsset::getTableName,dataAsset.getTableName ());
}} );
} }
// 批量添加数据资产 List< DataAsset > dataAssets = dataAssetService.list ( new LambdaQueryWrapper< DataAsset > ( ).eq (
dataAssetService.saveBatch ( dataAssets ); DataAsset::getDataSourceId, dataSource.getId ( )
) );
List< AssetModel > assetModels = new ArrayList<> ( ); // 遍历数据资产列表 client
// 遍历数据资产列表
for (DataAsset dataAsset : dataAssets) { for (DataAsset dataAsset : dataAssets) {
// / 检查数据资产是否有字段(即列) // / 检查数据资产是否有字段(即列)
if (dataAsset.getFields ()!=0){ if (dataAsset.getFields ()!=0){
@ -257,7 +276,6 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
} }
primaryKeys.close (); primaryKeys.close ();
// 获取表的相对应的列的名字 // 获取表的相对应的列的名字
int index=1; int index=1;
while (assetsInfo.next ()){ while (assetsInfo.next ()){
// 获取列的小数位数 // 获取列的小数位数
@ -279,20 +297,19 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
.defaultValue ( StringUtils.isEmpty ( defaultValue )?"-":defaultValue) .defaultValue ( StringUtils.isEmpty ( defaultValue )?"-":defaultValue)
.dictKey ( "" ) .dictKey ( "" )
.dictionaryId ( null ).build (); .dictionaryId ( null ).build ();
assetModels.add ( assetModel ); log.info ( assetModel );
} assetModelService.saveOrUpdate (assetModel, new LambdaUpdateWrapper<AssetModel> ( ){{
// 保存资产模型集合到数据库 eq ( AssetModel::getDataAssetId ,assetModel.getDataAssetId () );
assetModelService.saveBatch ( assetModels ); eq ( AssetModel::getName ,assetModel.getName () );
}} );
conn.close ();;
assetsInfo.close ();
return Result.success ( );
} }
} }
}
conn.close ();
} catch (ClassNotFoundException | SQLException e) { } catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException ( e ); throw new RuntimeException ( e );
} }
return Result.error ( ); return Result.success ( );
} }

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.etl.rule.engine.mapper.RuleEngineVersionMapper">
</mapper>