From 230606de1d6aa99fb539cb198d7c68fa78784261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=B7=E8=B0=83?= <3084898776@qq.com> Date: Thu, 5 Sep 2024 20:17:06 +0800 Subject: [PATCH] =?UTF-8?q?09052016:=E5=AF=B9=E8=BF=99=E4=B8=AA=E8=B5=84?= =?UTF-8?q?=E4=BA=A7=E5=B1=95=E7=A4=BA=E4=BD=BF=E7=94=A8=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E7=9A=84=E6=96=B9=E6=B3=95=E5=8E=BB=E5=81=9A,?= =?UTF-8?q?=E7=AC=AC=E5=9B=9B=E6=AC=A1=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/Impl/DataValueServiceImpl.java | 186 +++++++----------- 1 file changed, 76 insertions(+), 110 deletions(-) diff --git a/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java b/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java index 4c822c4..b0a2924 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java +++ b/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java @@ -11,21 +11,16 @@ import com.muyu.source.pool.MysqlPool; import com.muyu.source.service.DataSourceService; import com.muyu.source.service.DataValueService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.actuate.autoconfigure.metrics.MetricsProperties; import org.springframework.stereotype.Service; import java.sql.*; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static com.muyu.source.pool.config.BaseConfig.SELECTALL; - /** * @author Lenovo * @ Tool:IntelliJ IDEA @@ -37,11 +32,12 @@ import static com.muyu.source.pool.config.BaseConfig.SELECTALL; @Service public class DataValueServiceImpl extends ServiceImpl implements DataValueService { - private static final int THREAD_POOL_SIZE = 10; - private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE); - @Autowired private DataSourceService dataSourceService; + // 每批次查询的记录数 + private static final int BATCH_SIZE = 10000; + // 线程池大小 + private static final int NUM_THREADS = 10; @Override public List findTableValue(DataValueModel dataValueModel) { List dataValueList = new ArrayList<>(); @@ -128,53 +124,6 @@ public class DataValueServiceImpl extends ServiceImpl findTableValueByTableName(Long basicId, String tableName) { -// MysqlQuery mySqlQuery = new MysqlQuery(); -// mySqlQuery.setDataSourceId(String.valueOf(basicId)); -// DataSource etlDataScore = dataSourceService.getById(basicId); -// MysqlPool mysqlPool = new MysqlPool(etlDataScore); -// mysqlPool.init(); -// Connection conn = mysqlPool.getConn(); -// -// List list = new ArrayList<>(); -// -// try { -// PreparedStatement preparedStatement = conn.prepareStatement(SELECTALL + tableName); -// ResultSet resultSet = preparedStatement.executeQuery(); -// ResultSetMetaData metaData = resultSet.getMetaData(); -// int columnCount = metaData.getColumnCount(); -// -// while (resultSet.next()) { -// for (int i = 1; i <= columnCount; i++) { -// final int index = i; -// Future future = executorService.submit(() -> { -// String columnTypeName = metaData.getColumnTypeName(index); -// DatabaseMetaData metaDataColumns = conn.getMetaData(); -// ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(index), metaData.getColumnName(index)); -// String remarks = null; -// while (columns.next()) { -// remarks = columns.getString("REMARKS"); -// } -// return DataValue.builder() -// .key(metaData.getColumnName(index)) -// .label(remarks) -// .value(resultSet.getObject(index, DataType.convertType(columnTypeName))) -// .type(DataType.convertTypeString(columnTypeName)) -// .build(); -// }); -// list.add(future.get()); -// } -// } -// } catch (SQLException | InterruptedException | ExecutionException e) { -// throw new RuntimeException(e); -// } finally { -// mysqlPool.replease(conn); -// mysqlPool.closeConn(); -// } -// return list; -//} @Override public Integer addTableValue(DataValueModel dataValueModel) { @@ -192,77 +141,94 @@ public class DataValueServiceImpl extends ServiceImpl findTableValueByTableName(Long basicId, String tableName) { - MysqlQuery mySqlQuery = new MysqlQuery(); - mySqlQuery.setDataSourceId(String.valueOf(basicId)); + public List findTableValueByTableName(Long basicId, String tableName) { DataSource etlDataScore = dataSourceService.getById(basicId); MysqlPool mysqlPool = new MysqlPool(etlDataScore); mysqlPool.init(); - Connection conn = mysqlPool.getConn(); - - List list = new ArrayList<>(); - ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + int totalRecords = 0; try { - PreparedStatement preparedStatement = conn.prepareStatement(SELECTALL + tableName+" limit 100"); - ResultSet resultSet = preparedStatement.executeQuery(); - ResultSetMetaData metaData = resultSet.getMetaData(); - int columnCount = metaData.getColumnCount(); + totalRecords = getTotalRecords(mysqlPool, tableName); + } catch (SQLException e) { + throw new RuntimeException(e); + } + int totalBatches = (totalRecords + BATCH_SIZE - 1) / BATCH_SIZE; + ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + List>> futures = new ArrayList<>(); - // Pre-fetch column details - Map columnRemarks = new HashMap<>(); - DatabaseMetaData metaDataColumns = conn.getMetaData(); - for (int i = 1; i <= columnCount; i++) { - ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i)); - if (columns.next()) { - columnRemarks.put(i, columns.getString("REMARKS")); - } + // 创建任务 + for (int batch = 0; batch < totalBatches; batch++) { + final int offset = batch * BATCH_SIZE; + futures.add(executorService.submit(() -> fetchBatchData(mysqlPool, tableName, offset))); + } + + List allResults = new ArrayList<>(); + for (Future> future : futures) { + try { + allResults.addAll(future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } + } - // Collect all rows first - List> rows = new ArrayList<>(); - while (resultSet.next()) { - Map row = new HashMap<>(); - for (int i = 1; i <= columnCount; i++) { - row.put(i, resultSet.getObject(i)); - } - rows.add(row); + executorService.shutdown(); + mysqlPool.closeConn(); + return allResults; + } + + private int getTotalRecords(MysqlPool mysqlPool, String tableName) throws SQLException { + try (Connection conn = mysqlPool.getConn(); + Statement statement = conn.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) FROM " + tableName)) { + if (resultSet.next()) { + return resultSet.getInt(1); + } else { + throw new SQLException("Failed to get total record count"); } + } + } - List> futures = new ArrayList<>(); + private List fetchBatchData(MysqlPool mysqlPool, String tableName, int offset) { + List batchResults = new ArrayList<>(); + String query = "SELECT * FROM " + tableName + " LIMIT ? OFFSET ?"; - // Process each row in separate threads - for (Map row : rows) { - for (int i = 1; i <= columnCount; i++) { - final int index = i; - final Object value = row.get(index); - futures.add(executorService.submit(() -> { - String columnTypeName = metaData.getColumnTypeName(index); - return DataValue.builder() - .key(metaData.getColumnName(index)) - .label(columnRemarks.getOrDefault(index, "")) - .value(value) + try (Connection conn = mysqlPool.getConn(); + PreparedStatement preparedStatement = conn.prepareStatement(query)) { + preparedStatement.setInt(1, BATCH_SIZE); + preparedStatement.setInt(2, offset); + + try (ResultSet resultSet = preparedStatement.executeQuery()) { + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + + while (resultSet.next()) { + for (int i = 1; i <= columnCount; i++) { + String columnTypeName = metaData.getColumnTypeName(i); + + DatabaseMetaData metaDataColumns = conn.getMetaData(); + ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i)); + String remarks = null; + while (columns.next()) { + remarks = columns.getString("REMARKS"); + } + + DataValue build = DataValue.builder() + .key(metaData.getColumnName(i)) + .label(remarks) + .value(resultSet.getObject(i, DataType.convertType(columnTypeName))) .type(DataType.convertTypeString(columnTypeName)) .build(); - })); - } - } - - // Collect results - for (Future future : futures) { - try { - list.add(future.get()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + batchResults.add(build); + } } } } catch (SQLException e) { throw new RuntimeException(e); - } finally { - threadPool.shutdown(); - mysqlPool.replease(conn); - mysqlPool.closeConn(); } - return list; + return batchResults; } -} + + + } + +