diff --git a/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java b/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java index 86c44a7..7191118 100644 --- a/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java +++ b/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java @@ -3,10 +3,12 @@ package com.muyu.etl.service.impl; import java.sql.*; import java.util.*; import java.util.Date; +import java.util.concurrent.*; import java.util.stream.Stream; import com.muyu.common.core.domain.Result; import com.muyu.common.core.utils.DateUtils; +import com.muyu.common.log.annotation.Log; import com.muyu.common.security.utils.SecurityUtils; import com.muyu.common.system.domain.LoginUser; import com.muyu.common.system.domain.SysRole; @@ -15,6 +17,7 @@ import com.muyu.etl.domain.Dictionary; import com.muyu.etl.domain.custom.*; import com.muyu.etl.feign.SysUserFeignService; import com.muyu.etl.mapper.*; +import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.muyu.etl.service.IDataSourceService; @@ -27,6 +30,7 @@ import org.springframework.transaction.annotation.Transactional; * @date 2024-04-20 */ @Service +@Log4j2 public class DataSourceServiceImpl implements IDataSourceService { @Autowired @@ -53,6 +57,8 @@ public class DataSourceServiceImpl implements IDataSourceService @Autowired private SysUserFeignService sysUserFeignService; + private final ExecutorService executorService = Executors.newFixedThreadPool(6); + /** * 查询【请填写功能名称】 * @@ -263,9 +269,9 @@ public class DataSourceServiceImpl implements IDataSourceService String sql=""; jdbcUrl = "jdbc:"+dataSource.getType().toLowerCase()+"://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); if (dataSource.getType().equals("MySql")){ - sql = "select * from "+tableName; + sql = "select * from "+tableName+" where 2=1"; }else{ - sql = "select * from "+dataSource.getModeName()+"."+tableName; + sql = "select * from "+dataSource.getModeName()+"."+tableName+" where 2=1"; } Connection conn = null; Map map = new HashMap<>(); @@ -304,7 +310,10 @@ public class DataSourceServiceImpl implements IDataSourceService jdbcUrl = "jdbc:"+dataSource.getType().toLowerCase()+"://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); Connection conn = null; List assetModels = new ArrayList<>(); + long staTime = new Date().getTime(); Map typeMap = getTypeMap(dataSource, tableName); + long endTime = new Date().getTime(); + log.info("查询表字段类型信息耗时:"+(endTime-staTime)+"ms"); try { Class.forName(dataSource.getJdbcDriver()); conn = DriverManager.getConnection(jdbcUrl, dataSource.getUsername(), dataSource.getPassword()); @@ -412,7 +421,7 @@ public class DataSourceServiceImpl implements IDataSourceService public Result synchronousData(DataSource dataSource) { deleteChildLevel(new Long[]{dataSource.getId()},"synchronous"); String jdbcUrl = ""; - String sql=""; + String sql; jdbcUrl = "jdbc:"+dataSource.getType().toLowerCase()+"://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); if (dataSource.getType().equals("MySql")){ sql="SELECT TABLE_NAME t_name,TABLE_COMMENT table_comment,TABLE_ROWS table_rows,(SELECT count(*) FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' and TABLE_NAME=t_name) fields FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='"+dataSource.getDatabaseName()+"'"; @@ -437,39 +446,73 @@ public class DataSourceServiceImpl implements IDataSourceService "GROUP BY \n" + " c.relname, pgd.description, c.reltuples"; } - Connection conn = null; + Connection conn; HashMap map = new HashMap<>(); ArrayList dataAssets = new ArrayList<>(); try { + long startTime = new Date().getTime(); Class.forName(dataSource.getJdbcDriver()); conn = DriverManager.getConnection(jdbcUrl, dataSource.getUsername(), dataSource.getPassword()); - PreparedStatement ps = conn.prepareStatement(sql); - ResultSet resultSet = ps.executeQuery(); - while (resultSet.next()){ - DataAsset dataAsset = new DataAsset(); - dataAsset.setTableName(resultSet.getString("t_name")); - if (resultSet.getString("table_comment")==null || "".equals(resultSet.getString("table_comment"))){ - dataAsset.setTableComment("-"); - }else{ - dataAsset.setTableComment(resultSet.getString("table_comment")); + + Future> dataAssetsFuture = executorService.submit(() -> { + PreparedStatement ps = conn.prepareStatement(sql); + ResultSet resultSet = ps.executeQuery(); + + while (resultSet.next()) { + DataAsset dataAsset = new DataAsset(); + // ... (保留原方法中对dataAsset的设置逻辑) + dataAsset.setTableName(resultSet.getString("t_name")); + if (resultSet.getString("table_comment")==null || "".equals(resultSet.getString("table_comment"))){ + dataAsset.setTableComment("-"); + }else{ + dataAsset.setTableComment(resultSet.getString("table_comment")); + } + dataAsset.setTableCount(Long.valueOf(resultSet.getString("table_rows"))); + dataAsset.setFields(Long.valueOf(resultSet.getString("fields"))); + dataAsset.setDataSourceId(dataSource.getId()); + dataAsset.setCreateBy(SecurityUtils.getUsername()); + dataAsset.setCreateTime(new Date()); + dataAssets.add(dataAsset); } - dataAsset.setTableCount(Long.valueOf(resultSet.getString("table_rows"))); - dataAsset.setFields(Long.valueOf(resultSet.getString("fields"))); - dataAsset.setDataSourceId(dataSource.getId()); - dataAsset.setCreateBy(SecurityUtils.getUsername()); - dataAsset.setCreateTime(new Date()); - dataAssets.add(dataAsset); - } - dataAssetMapper.batchInsert(dataAssets); - for (DataAsset dataAsset : dataAssets) { - List tableAssets = getTableAssets(dataSource, dataAsset.getTableName()); - tableAssets.stream().forEach(assetModel -> assetModel.setDataAssetId(dataAsset.getId())); - assetModelMapper.batchInsert(tableAssets); - } + return dataAssets; + }); + CompletableFuture> completableFuture = CompletableFuture.completedFuture(null).thenCompose(unused -> { + try { + return CompletableFuture.supplyAsync(() -> { + try { + return dataAssetsFuture.get(); // 等待Future完成并获取结果 + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error getting result from Future", e); + } + }, executorService); + } catch (RuntimeException e) { + return CompletableFuture.failedFuture(e); + } + }); + List assetModels = new ArrayList<>(); + completableFuture.thenAccept(dataAssetss -> { + dataAssetMapper.batchInsert(dataAssets); + + List> assetModelFutures = new ArrayList<>(); + for (DataAsset dataAsset : dataAssets) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + List tableAssets = getTableAssets(dataSource, dataAsset.getTableName()); + tableAssets.forEach(assetModel -> assetModel.setDataAssetId(dataAsset.getId())); + assetModels.addAll(tableAssets); + }, executorService); + assetModelFutures.add(future); + } + CompletableFuture.allOf(assetModelFutures.toArray(new CompletableFuture[0])).join(); + assetModelMapper.batchInsert(assetModels); + }); + + long endTime = new Date().getTime(); + log.info("处理总耗时: " + (endTime - startTime) + "ms"); + + return Result.success(); } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); } - return Result.success(); } @Override