diff --git a/muyu-source-common/src/main/java/com/muyu/source/core/DataType.java b/muyu-source-common/src/main/java/com/muyu/source/core/DataType.java index 1f218e3..2a74e1f 100644 --- a/muyu-source-common/src/main/java/com/muyu/source/core/DataType.java +++ b/muyu-source-common/src/main/java/com/muyu/source/core/DataType.java @@ -2,6 +2,7 @@ package com.muyu.source.core; import java.math.BigDecimal; import java.util.Date; +import java.util.Map; /** * @author Lenovo @@ -11,6 +12,7 @@ import java.util.Date; * @ Version:1.0 * @ Description:数据类型枚举 */ + public enum DataType { VARCHAR("varchar",String.class,"String"), BIGINT("bigint", Long.class,"Long"), @@ -20,15 +22,28 @@ public enum DataType { TEXT("text", String.class,"String"), DOUBLE("double", Double.class,"Double"); + // 数据库源类型 private final String sourceType; - + // 映射到的Java类类型 private final Class targetType; - + // Java类型的字符串表示 private final String javaType; + public String getSourceType() { + return sourceType; + } + + public Class getTargetType() { + return targetType; + } + + public String getJavaType() { + return javaType; + } + + public static Class convertType(String type){ - for (DataType dataType : DataType.values()) { if (dataType.sourceType.equalsIgnoreCase(type)){ return dataType.targetType; @@ -37,8 +52,16 @@ public enum DataType { return String.class; } - public static String convertTypeString(String type){ + public static DataType findBySqlType(String sqlType){ + for (DataType dataType : DataType.values()) { + if (dataType.getSourceType().equalsIgnoreCase(sqlType)){ + return dataType; + } + } + return VARCHAR; + } + public static String convertTypeString(String type){ for (DataType dataType : DataType.values()) { if (dataType.sourceType.equalsIgnoreCase(type)){ return dataType.javaType; diff --git a/muyu-source-common/src/main/java/com/muyu/source/core/DataValue.java b/muyu-source-common/src/main/java/com/muyu/source/core/DataValue.java index 71cd911..218e1eb 100644 --- a/muyu-source-common/src/main/java/com/muyu/source/core/DataValue.java +++ b/muyu-source-common/src/main/java/com/muyu/source/core/DataValue.java @@ -22,5 +22,5 @@ public class DataValue { private String label; private String key; private Object value; - private String type; + private DataType type; } diff --git a/muyu-source-server/src/main/java/com/muyu/source/controller/DataValueController.java b/muyu-source-server/src/main/java/com/muyu/source/controller/DataValueController.java index 38cb001..229dc92 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/controller/DataValueController.java +++ b/muyu-source-server/src/main/java/com/muyu/source/controller/DataValueController.java @@ -37,7 +37,7 @@ public class DataValueController { @PostMapping("/findTableValue") @Operation(summary = "根据基础表ID和SQL语句查询数据", description = "根据基础表ID和SQL语句查询数据") public Result findTableValue(@RequestBody DataValueModel dataValueModel) { - List dataValueList = dataValueService.findTableValue(dataValueModel); + List> dataValueList = dataValueService.findTableValue(dataValueModel); return Result.success(dataValueList); } diff --git a/muyu-source-server/src/main/java/com/muyu/source/service/DataValueService.java b/muyu-source-server/src/main/java/com/muyu/source/service/DataValueService.java index 7a2a4a7..5cbedcb 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/service/DataValueService.java +++ b/muyu-source-server/src/main/java/com/muyu/source/service/DataValueService.java @@ -16,9 +16,9 @@ import java.util.List; * @ Description:资产展示{KLTV}业务层 */ public interface DataValueService extends IService { - List findTableValue(DataValueModel dataValueModel); + List> findTableValue(DataValueModel dataValueModel); - List findTableValueByTableName(Long basicId, String tableName) throws SQLException; + List findTableValueByTableName(Long basicId, String tableName); Integer addTableValue(DataValueModel dataValueModel); } diff --git a/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java b/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java index 026e3e5..29fd000 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java +++ b/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataValueServiceImpl.java @@ -10,17 +10,21 @@ import com.muyu.source.mysql.MysqlQuery; import com.muyu.source.pool.MysqlPool; import com.muyu.source.service.DataSourceService; import com.muyu.source.service.DataValueService; +import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.sql.*; +import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import static com.muyu.source.pool.config.BaseConfig.SELECTALL; @@ -32,50 +36,278 @@ import static com.muyu.source.pool.config.BaseConfig.SELECTALL; * @ Version:1.0 * @ Description:资产展示{KLTV}业务实现层 */ +@Log4j2 @Service public class DataValueServiceImpl extends ServiceImpl implements DataValueService { + // 分页查询的批次大小 + private static final int BATCH_SIZE = 1000; + // 每次查询的记录数 + private static final int PAGE_SIZE = 100; + // 异步处理线程池 + private ExecutorService executorService; + private List> futures = new ArrayList<>(); + // 标记线程池是否在关闭过程中 + private AtomicBoolean isShuttingDown = new AtomicBoolean(false); + + public DataValueServiceImpl() { + // 初始化一个固定大小的线程池 + int corePoolSize = Runtime.getRuntime().availableProcessors(); + executorService = Executors.newFixedThreadPool(corePoolSize); + } + @Autowired private DataSourceService dataSourceService; + @Autowired + private DataValueService dataValueService; @Override - public List findTableValue(DataValueModel dataValueModel) { - List dataValueList = new ArrayList<>(); - - MysqlQuery mysqlQuery = new MysqlQuery(); - mysqlQuery.setDataSourceId(String.valueOf(dataValueModel.getBasicId())); - DataSource dataSource = dataSourceService.getById(dataValueModel.getBasicId()); - MysqlPool mysqlPool = new MysqlPool(dataSource); + public List> findTableValue(DataValueModel dataValueModel) { + ConcurrentHashMap map = new ConcurrentHashMap<>(); + // 从服务中获取EtlDataScore对象,根据基本ID + DataSource etlDataScore = dataSourceService.getById(dataValueModel.getBasicId()); + // 创建MySQL连接池对象,并初始化 + MysqlPool mysqlPool = new MysqlPool(etlDataScore); mysqlPool.init(); + // 从连接池获取数据库连接 Connection conn = mysqlPool.getConn(); - try { - PreparedStatement preparedStatement = conn.prepareStatement(dataValueModel.getSql()); - ResultSet resultSet = preparedStatement.executeQuery(); - ResultSetMetaData metaData = resultSet.getMetaData(); - int columnCount = metaData.getColumnCount(); - while (resultSet.next()){ - for (int i = 1; i <= columnCount; i++) { - String columnTypeName = metaData.getColumnTypeName(i); - DatabaseMetaData metaDataColumns = conn.getMetaData(); - ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i)); - String remarks =null; - while (columns.next()){ - remarks = columns.getString("REMARKS"); + // 初始化一个列表,用于存储数据值对象 + List> list = new ArrayList<>(); + + try { + // 准备SQL查询语句 + PreparedStatement preparedStatement = conn.prepareStatement(dataValueModel.getSql()); + // 执行查询,获取结果集 + ResultSet resultSet = preparedStatement.executeQuery(); + // 获取结果集的元数据,用于获取列的数量和类型等信息 + ResultSetMetaData metaData = resultSet.getMetaData(); + // 获取列的数量 + int columnCount = metaData.getColumnCount(); + + // 遍历结果集中的每一行数据 + while (resultSet.next()){ + ArrayList dataValues = new ArrayList<>(); + // 遍历每一列 + for (int i = 1; i <= columnCount; i++) { + if(resultSet.isFirst()){ + + // 获取当前列的类型名称 + String columnTypeName = metaData.getColumnTypeName(i); + // 获取数据库的元数据对象 + DatabaseMetaData metaDataColumns = conn.getMetaData(); + // 查询数据库元数据,获取当前列的备注信息 + ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i)); + String remarks =null; + // 遍历备注信息的结果集 + while (columns.next()){ + // 获取当前字段的备注信息 + remarks = columns.getString("REMARKS"); + + // 记录日志,显示字段的备注信息 + log.info("字段备注:"+remarks); + } + // 构建数据值对象,包含列名、备注、值、类型等信息 + DataValue build = DataValue.builder() + .key(metaData.getColumnName(i))// 当前列的名称 + .label(remarks)// 当前列的备注信息 + .value(resultSet.getObject(i, DataType.convertType(columnTypeName)))// 当前列的值,类型转换 + .type(DataType.findBySqlType(columnTypeName))// 当前列的类型,转换为字符串表示 + .build(); + dataValues.add(build); + map.put(i, build); + }else { + DataValue build = DataValue.builder() + .key(metaData.getColumnName(i))// 当前列的名称 + .label(map.get(i).getLabel())// 当前列的备注信息 + .value(resultSet.getObject(i, map.get(i).getType().getTargetType()))// 当前列的值,类型转换 + .type(map.get(i).getType())// 当前列的类型,转换为字符串表示 + .build(); + dataValues.add(build); } - DataValue build = DataValue.builder() - .key(metaData.getColumnName(i)) - .label(remarks) - .value(resultSet.getObject(i, DataType.convertType(columnTypeName))) - .type(DataType.convertTypeString(columnTypeName)) - .build(); - dataValueList.add(build); + } + list.add(dataValues); } } catch (SQLException e) { + // 如果发生SQL异常,抛出运行时异常 throw new RuntimeException(e); } - return dataValueList; + // 释放数据库连接 + mysqlPool.replease(conn); + // 关闭数据库连接 + mysqlPool.closeConn(); + // 返回包含数据值的列表 + return list; + } + + @Override + public List findTableValueByTableName(Long basicId, String tableName) { + List dataValues = new ArrayList<>(); + DataSource dataSources = dataSourceService.getById(basicId); + MysqlPool mysqlPool = new MysqlPool(dataSources); + mysqlPool.init(); + Connection conn = mysqlPool.getConn(); + + try { + // 准备SQL查询语句 + String sql = SELECTALL + tableName; + PreparedStatement preparedStatement = conn.prepareStatement(sql); + // 获取结果集的元数据,用于获取列的数量和类型等信息 + ResultSetMetaData metaData = preparedStatement.getMetaData(); + // 获取列的数量 + int columnCount = metaData.getColumnCount(); + + // 使用Map存储列的元数据,包括备注信息 + Map columnMetadata = getColumnMetadata(conn, metaData, columnCount); + + // 初始化分页参数 + int offset = 0; + boolean hasMorePages = true; + + while (hasMorePages) { + // 构造带分页的SQL查询 + String pageSql = sql + " LIMIT " + PAGE_SIZE + " OFFSET " + offset; + PreparedStatement pageStatement = conn.prepareStatement(pageSql); + ResultSet pageResultSet = pageStatement.executeQuery(); + + List pageDataValues = new ArrayList<>(); + while (pageResultSet.next()) { + List rowValues = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnName(i); + ColumnMeta columnData = columnMetadata.get(columnName); + Object value = pageResultSet.getObject(i, columnData.getType().getTargetType()); + + // 构建数据值对象,包含列名、备注、值、类型等信息 + DataValue dataValue = DataValue.builder() + .key(columnName) + .label(columnData.getRemarks()) + .value(value) + .type(columnData.getType()) + .build(); + rowValues.add(dataValue); + } + pageDataValues.addAll(rowValues); + } + + // 每处理完一定数量的数据,提交一次 + if (!pageDataValues.isEmpty()) { + CompletableFuture future = processBatchAsync(pageDataValues); + futures.add(future); + dataValues.addAll(pageDataValues); + } + + // 检查是否有更多页面 + hasMorePages = pageResultSet.getFetchSize() >= PAGE_SIZE; + offset += PAGE_SIZE; + } + + // 确保所有异步任务完成后再关闭线程池 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + } catch (SQLException e) { + // 如果发生SQL异常,抛出运行时异常 + throw new RuntimeException(e); + } finally { + // 在所有异步任务完成后关闭线程池 +// shutdownExecutorService(executorService); + + // 释放数据库连接 + mysqlPool.replease(conn); + // 关闭数据库连接 + mysqlPool.closeConn(); + } + + // 返回包含数据值的列表 + return dataValues; + } + +// private void shutdownExecutorService(ExecutorService executorService) { +// if (!isShuttingDown.compareAndSet(false, true)) { +// // 已经在关闭过程中,直接返回 +// return; +// } +// +// // 记录开始关闭线程池的时间点 +// System.out.println("开始关闭线程池:" + LocalDateTime.now()); +// +// executorService.shutdown(); +// try { +// if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { +// executorService.shutdownNow(); // 取消正在执行的任务 +// if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { +// System.err.println("线程池未正常关闭!"); +// } +// } +// } catch (InterruptedException ex) { +// executorService.shutdownNow(); // (Re-)Cancel if current thread also interrupted +// Thread.currentThread().interrupt(); // Preserve interrupt status +// } +// +// // 记录线程池关闭完成的时间点 +// System.out.println("线程池关闭完成:" + LocalDateTime.now()); +// } + + private CompletableFuture processBatchAsync(final List batch) { + // 记录任务提交的时间点 + System.out.println("任务提交:" + LocalDateTime.now()); + + return CompletableFuture.runAsync(() -> { + try { + // 具体的批量处理逻辑,例如写入文件、更新数据库等 + dataValueService.saveBatch(batch); + } catch (Exception e) { + // 异步处理中的异常处理 + e.printStackTrace(); + } + // 记录任务完成的时间点 + System.out.println("任务完成:" + LocalDateTime.now()); + }, executorService); + } + + private Map getColumnMetadata(Connection conn, ResultSetMetaData metaData, int columnCount) throws SQLException { + Map metadataMap = new HashMap<>(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnName(i); + String columnTypeName = metaData.getColumnTypeName(i); + + // 查询数据库元数据,获取当前列的备注信息 + ResultSet columns = conn.getMetaData().getColumns(null, null, metaData.getTableName(i), columnName); + String remarks = null; + while (columns.next()) { + remarks = columns.getString("REMARKS"); + } + + // 构建列元数据对象 + ColumnMeta columnData = new ColumnMeta(columnName, remarks, DataType.findBySqlType(columnTypeName)); + metadataMap.put(columnName, columnData); + } + return metadataMap; + } + + private static class ColumnMeta { + private final String columnName; + private final String remarks; + private final DataType type; + + public ColumnMeta(String columnName, String remarks, DataType type) { + this.columnName = columnName; + this.remarks = remarks; + this.type = type; + } + + public String getColumnName() { + return columnName; + } + + public String getRemarks() { + return remarks; + } + + public DataType getType() { + return type; + } } // @Override @@ -140,89 +372,7 @@ public class DataValueServiceImpl extends ServiceImpl findTableValueByTableName(Long basicId, String tableName) { - MysqlQuery mySqlQuery = new MysqlQuery(); - mySqlQuery.setDataSourceId(String.valueOf(basicId)); - DataSource etlDataScore = dataSourceService.getById(basicId); - MysqlPool mysqlPool = new MysqlPool(etlDataScore); - mysqlPool.init(); - Connection conn = mysqlPool.getConn(); - - List list = Collections.synchronizedList(new ArrayList<>()); - - try { - // Determine the total number of rows - int totalRows = getTotalRowCount(conn, tableName); - int rowsPerThread = totalRows / THREAD_POOL_SIZE; - - List>> futures = new ArrayList<>(); - for (int i = 0; i < THREAD_POOL_SIZE; i++) { - int start = i * rowsPerThread; - int end = (i == THREAD_POOL_SIZE - 1) ? totalRows : start + rowsPerThread; - futures.add(executorService.submit(() -> fetchData(conn, tableName, start, end))); - } - - // Gather results from all threads - for (Future> future : futures) { - list.addAll(future.get()); - } - } catch (SQLException | InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } finally { - mysqlPool.replease(conn); - mysqlPool.closeConn(); - executorService.shutdown(); - } - return list; - } - - private List fetchData(Connection conn, String tableName, int start, int end) throws SQLException { - List resultList = new ArrayList<>(); - String query = SELECTALL + tableName + " LIMIT " + start + ", " + (end - start); - - try (PreparedStatement preparedStatement = conn.prepareStatement(query); - ResultSet resultSet = preparedStatement.executeQuery()) { - - ResultSetMetaData metaData = resultSet.getMetaData(); - int columnCount = metaData.getColumnCount(); - - while (resultSet.next()) { - for (int i = 1; i <= columnCount; i++) { - String columnTypeName = metaData.getColumnTypeName(i); - DatabaseMetaData metaDataColumns = conn.getMetaData(); - try (ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i))) { - String remarks = null; - while (columns.next()) { - remarks = columns.getString("REMARKS"); - } - DataValue build = DataValue.builder() - .key(metaData.getColumnName(i)) - .label(remarks) - .value(resultSet.getObject(i, DataType.convertType(columnTypeName))) - .type(DataType.convertTypeString(columnTypeName)) - .build(); - resultList.add(build); - } - } - } - } - return resultList; - } - - private int getTotalRowCount(Connection conn, String tableName) throws SQLException { - String countQuery = "SELECT COUNT(*) FROM " + tableName; - try (Statement statement = conn.createStatement(); - ResultSet resultSet = statement.executeQuery(countQuery)) { - if (resultSet.next()) { - return resultSet.getInt(1); - } - } - return 0; - } }