perf: 优化同步资产结构的功能

使用线程池来优化同步资产结构功能,
master
yaoxin 2024-04-26 21:56:21 +08:00
parent 0960b70277
commit cf644531f8
1 changed files with 70 additions and 27 deletions

View File

@ -3,10 +3,12 @@ package com.muyu.etl.service.impl;
import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.concurrent.*;
import java.util.stream.Stream;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.DateUtils;
import com.muyu.common.log.annotation.Log;
import com.muyu.common.security.utils.SecurityUtils;
import com.muyu.common.system.domain.LoginUser;
import com.muyu.common.system.domain.SysRole;
@ -15,6 +17,7 @@ import com.muyu.etl.domain.Dictionary;
import com.muyu.etl.domain.custom.*;
import com.muyu.etl.feign.SysUserFeignService;
import com.muyu.etl.mapper.*;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.muyu.etl.service.IDataSourceService;
@ -27,6 +30,7 @@ import org.springframework.transaction.annotation.Transactional;
* @date 2024-04-20
*/
@Service
@Log4j2
public class DataSourceServiceImpl implements IDataSourceService
{
@Autowired
@ -53,6 +57,8 @@ public class DataSourceServiceImpl implements IDataSourceService
@Autowired
private SysUserFeignService sysUserFeignService;
private final ExecutorService executorService = Executors.newFixedThreadPool(6);
/**
*
*
@ -263,9 +269,9 @@ public class DataSourceServiceImpl implements IDataSourceService
String sql="";
jdbcUrl = "jdbc:"+dataSource.getType().toLowerCase()+"://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
if (dataSource.getType().equals("MySql")){
sql = "select * from "+tableName;
sql = "select * from "+tableName+" where 2=1";
}else{
sql = "select * from "+dataSource.getModeName()+"."+tableName;
sql = "select * from "+dataSource.getModeName()+"."+tableName+" where 2=1";
}
Connection conn = null;
Map<String, String> map = new HashMap<>();
@ -304,7 +310,10 @@ public class DataSourceServiceImpl implements IDataSourceService
jdbcUrl = "jdbc:"+dataSource.getType().toLowerCase()+"://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
Connection conn = null;
List<AssetModel> assetModels = new ArrayList<>();
long staTime = new Date().getTime();
Map<String, String> typeMap = getTypeMap(dataSource, tableName);
long endTime = new Date().getTime();
log.info("查询表字段类型信息耗时:"+(endTime-staTime)+"ms");
try {
Class.forName(dataSource.getJdbcDriver());
conn = DriverManager.getConnection(jdbcUrl, dataSource.getUsername(), dataSource.getPassword());
@ -412,7 +421,7 @@ public class DataSourceServiceImpl implements IDataSourceService
public Result synchronousData(DataSource dataSource) {
deleteChildLevel(new Long[]{dataSource.getId()},"synchronous");
String jdbcUrl = "";
String sql="";
String sql;
jdbcUrl = "jdbc:"+dataSource.getType().toLowerCase()+"://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
if (dataSource.getType().equals("MySql")){
sql="SELECT TABLE_NAME t_name,TABLE_COMMENT table_comment,TABLE_ROWS table_rows,(SELECT count(*) FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' and TABLE_NAME=t_name) fields FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='"+dataSource.getDatabaseName()+"'";
@ -437,39 +446,73 @@ public class DataSourceServiceImpl implements IDataSourceService
"GROUP BY \n" +
" c.relname, pgd.description, c.reltuples";
}
Connection conn = null;
Connection conn;
HashMap<String, String> map = new HashMap<>();
ArrayList<DataAsset> dataAssets = new ArrayList<>();
try {
long startTime = new Date().getTime();
Class.forName(dataSource.getJdbcDriver());
conn = DriverManager.getConnection(jdbcUrl, dataSource.getUsername(), dataSource.getPassword());
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
DataAsset dataAsset = new DataAsset();
dataAsset.setTableName(resultSet.getString("t_name"));
if (resultSet.getString("table_comment")==null || "".equals(resultSet.getString("table_comment"))){
dataAsset.setTableComment("-");
}else{
dataAsset.setTableComment(resultSet.getString("table_comment"));
Future<List<DataAsset>> dataAssetsFuture = executorService.submit(() -> {
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
DataAsset dataAsset = new DataAsset();
// ... (保留原方法中对dataAsset的设置逻辑)
dataAsset.setTableName(resultSet.getString("t_name"));
if (resultSet.getString("table_comment")==null || "".equals(resultSet.getString("table_comment"))){
dataAsset.setTableComment("-");
}else{
dataAsset.setTableComment(resultSet.getString("table_comment"));
}
dataAsset.setTableCount(Long.valueOf(resultSet.getString("table_rows")));
dataAsset.setFields(Long.valueOf(resultSet.getString("fields")));
dataAsset.setDataSourceId(dataSource.getId());
dataAsset.setCreateBy(SecurityUtils.getUsername());
dataAsset.setCreateTime(new Date());
dataAssets.add(dataAsset);
}
dataAsset.setTableCount(Long.valueOf(resultSet.getString("table_rows")));
dataAsset.setFields(Long.valueOf(resultSet.getString("fields")));
dataAsset.setDataSourceId(dataSource.getId());
dataAsset.setCreateBy(SecurityUtils.getUsername());
dataAsset.setCreateTime(new Date());
dataAssets.add(dataAsset);
}
dataAssetMapper.batchInsert(dataAssets);
for (DataAsset dataAsset : dataAssets) {
List<AssetModel> tableAssets = getTableAssets(dataSource, dataAsset.getTableName());
tableAssets.stream().forEach(assetModel -> assetModel.setDataAssetId(dataAsset.getId()));
assetModelMapper.batchInsert(tableAssets);
}
return dataAssets;
});
CompletableFuture<List<DataAsset>> completableFuture = CompletableFuture.completedFuture(null).thenCompose(unused -> {
try {
return CompletableFuture.supplyAsync(() -> {
try {
return dataAssetsFuture.get(); // 等待Future完成并获取结果
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error getting result from Future", e);
}
}, executorService);
} catch (RuntimeException e) {
return CompletableFuture.failedFuture(e);
}
});
List<AssetModel> assetModels = new ArrayList<>();
completableFuture.thenAccept(dataAssetss -> {
dataAssetMapper.batchInsert(dataAssets);
List<CompletableFuture<Void>> assetModelFutures = new ArrayList<>();
for (DataAsset dataAsset : dataAssets) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
List<AssetModel> tableAssets = getTableAssets(dataSource, dataAsset.getTableName());
tableAssets.forEach(assetModel -> assetModel.setDataAssetId(dataAsset.getId()));
assetModels.addAll(tableAssets);
}, executorService);
assetModelFutures.add(future);
}
CompletableFuture.allOf(assetModelFutures.toArray(new CompletableFuture[0])).join();
assetModelMapper.batchInsert(assetModels);
});
long endTime = new Date().getTime();
log.info("处理总耗时: " + (endTime - startTime) + "ms");
return Result.success();
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
return Result.success();
}
@Override