09061023:对这个资产展示使用线程池的方法去做,第六次优化代码

master
冷调 2024-09-06 10:23:34 +08:00
parent fd57183af4
commit 0059f6038f
5 changed files with 294 additions and 121 deletions

View File

@ -2,6 +2,7 @@ package com.muyu.source.core;
import java.math.BigDecimal;
import java.util.Date;
import java.util.Map;
/**
* @author Lenovo
@ -11,6 +12,7 @@ import java.util.Date;
* @ Version1.0
* @ Description
*/
public enum DataType {
VARCHAR("varchar",String.class,"String"),
BIGINT("bigint", Long.class,"Long"),
@ -20,15 +22,28 @@ public enum DataType {
TEXT("text", String.class,"String"),
DOUBLE("double", Double.class,"Double");
// 数据库源类型
private final String sourceType;
// 映射到的Java类类型
private final Class<?> targetType;
// Java类型的字符串表示
private final String javaType;
public String getSourceType() {
return sourceType;
}
public Class<?> getTargetType() {
return targetType;
}
public String getJavaType() {
return javaType;
}
public static Class convertType(String type){
for (DataType dataType : DataType.values()) {
if (dataType.sourceType.equalsIgnoreCase(type)){
return dataType.targetType;
@ -37,8 +52,16 @@ public enum DataType {
return String.class;
}
public static String convertTypeString(String type){
public static DataType findBySqlType(String sqlType){
for (DataType dataType : DataType.values()) {
if (dataType.getSourceType().equalsIgnoreCase(sqlType)){
return dataType;
}
}
return VARCHAR;
}
public static String convertTypeString(String type){
for (DataType dataType : DataType.values()) {
if (dataType.sourceType.equalsIgnoreCase(type)){
return dataType.javaType;

View File

@ -22,5 +22,5 @@ public class DataValue {
private String label;
private String key;
private Object value;
private String type;
private DataType type;
}

View File

@ -37,7 +37,7 @@ public class DataValueController {
@PostMapping("/findTableValue")
@Operation(summary = "根据基础表ID和SQL语句查询数据", description = "根据基础表ID和SQL语句查询数据")
public Result findTableValue(@RequestBody DataValueModel dataValueModel) {
List<DataValue> dataValueList = dataValueService.findTableValue(dataValueModel);
List<List<DataValue>> dataValueList = dataValueService.findTableValue(dataValueModel);
return Result.success(dataValueList);
}

View File

@ -16,9 +16,9 @@ import java.util.List;
* @ Description{KLTV}
*/
public interface DataValueService extends IService<DataValue> {
List<DataValue> findTableValue(DataValueModel dataValueModel);
List<List<DataValue>> findTableValue(DataValueModel dataValueModel);
List<DataValue> findTableValueByTableName(Long basicId, String tableName) throws SQLException;
List<DataValue> findTableValueByTableName(Long basicId, String tableName);
Integer addTableValue(DataValueModel dataValueModel);
}

View File

@ -10,17 +10,21 @@ import com.muyu.source.mysql.MysqlQuery;
import com.muyu.source.pool.MysqlPool;
import com.muyu.source.service.DataSourceService;
import com.muyu.source.service.DataValueService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.muyu.source.pool.config.BaseConfig.SELECTALL;
@ -32,50 +36,278 @@ import static com.muyu.source.pool.config.BaseConfig.SELECTALL;
* @ Version1.0
* @ Description{KLTV}
*/
@Log4j2
@Service
public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue> implements DataValueService {
// 分页查询的批次大小
private static final int BATCH_SIZE = 1000;
// 每次查询的记录数
private static final int PAGE_SIZE = 100;
// 异步处理线程池
private ExecutorService executorService;
private List<CompletableFuture<Void>> futures = new ArrayList<>();
// 标记线程池是否在关闭过程中
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
public DataValueServiceImpl() {
// 初始化一个固定大小的线程池
int corePoolSize = Runtime.getRuntime().availableProcessors();
executorService = Executors.newFixedThreadPool(corePoolSize);
}
@Autowired
private DataSourceService dataSourceService;
@Autowired
private DataValueService dataValueService;
@Override
public List<DataValue> findTableValue(DataValueModel dataValueModel) {
List<DataValue> dataValueList = new ArrayList<>();
MysqlQuery mysqlQuery = new MysqlQuery();
mysqlQuery.setDataSourceId(String.valueOf(dataValueModel.getBasicId()));
DataSource dataSource = dataSourceService.getById(dataValueModel.getBasicId());
MysqlPool mysqlPool = new MysqlPool(dataSource);
public List<List<DataValue>> findTableValue(DataValueModel dataValueModel) {
ConcurrentHashMap<Integer, DataValue> map = new ConcurrentHashMap<>();
// 从服务中获取EtlDataScore对象根据基本ID
DataSource etlDataScore = dataSourceService.getById(dataValueModel.getBasicId());
// 创建MySQL连接池对象并初始化
MysqlPool mysqlPool = new MysqlPool(etlDataScore);
mysqlPool.init();
// 从连接池获取数据库连接
Connection conn = mysqlPool.getConn();
try {
PreparedStatement preparedStatement = conn.prepareStatement(dataValueModel.getSql());
ResultSet resultSet = preparedStatement.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()){
for (int i = 1; i <= columnCount; i++) {
String columnTypeName = metaData.getColumnTypeName(i);
DatabaseMetaData metaDataColumns = conn.getMetaData();
ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i));
String remarks =null;
while (columns.next()){
remarks = columns.getString("REMARKS");
// 初始化一个列表,用于存储数据值对象
List<List<DataValue>> list = new ArrayList<>();
try {
// 准备SQL查询语句
PreparedStatement preparedStatement = conn.prepareStatement(dataValueModel.getSql());
// 执行查询,获取结果集
ResultSet resultSet = preparedStatement.executeQuery();
// 获取结果集的元数据,用于获取列的数量和类型等信息
ResultSetMetaData metaData = resultSet.getMetaData();
// 获取列的数量
int columnCount = metaData.getColumnCount();
// 遍历结果集中的每一行数据
while (resultSet.next()){
ArrayList<DataValue> dataValues = new ArrayList<>();
// 遍历每一列
for (int i = 1; i <= columnCount; i++) {
if(resultSet.isFirst()){
// 获取当前列的类型名称
String columnTypeName = metaData.getColumnTypeName(i);
// 获取数据库的元数据对象
DatabaseMetaData metaDataColumns = conn.getMetaData();
// 查询数据库元数据,获取当前列的备注信息
ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i));
String remarks =null;
// 遍历备注信息的结果集
while (columns.next()){
// 获取当前字段的备注信息
remarks = columns.getString("REMARKS");
// 记录日志,显示字段的备注信息
log.info("字段备注:"+remarks);
}
// 构建数据值对象,包含列名、备注、值、类型等信息
DataValue build = DataValue.builder()
.key(metaData.getColumnName(i))// 当前列的名称
.label(remarks)// 当前列的备注信息
.value(resultSet.getObject(i, DataType.convertType(columnTypeName)))// 当前列的值,类型转换
.type(DataType.findBySqlType(columnTypeName))// 当前列的类型,转换为字符串表示
.build();
dataValues.add(build);
map.put(i, build);
}else {
DataValue build = DataValue.builder()
.key(metaData.getColumnName(i))// 当前列的名称
.label(map.get(i).getLabel())// 当前列的备注信息
.value(resultSet.getObject(i, map.get(i).getType().getTargetType()))// 当前列的值,类型转换
.type(map.get(i).getType())// 当前列的类型,转换为字符串表示
.build();
dataValues.add(build);
}
DataValue build = DataValue.builder()
.key(metaData.getColumnName(i))
.label(remarks)
.value(resultSet.getObject(i, DataType.convertType(columnTypeName)))
.type(DataType.convertTypeString(columnTypeName))
.build();
dataValueList.add(build);
}
list.add(dataValues);
}
} catch (SQLException e) {
// 如果发生SQL异常抛出运行时异常
throw new RuntimeException(e);
}
return dataValueList;
// 释放数据库连接
mysqlPool.replease(conn);
// 关闭数据库连接
mysqlPool.closeConn();
// 返回包含数据值的列表
return list;
}
@Override
public List<DataValue> findTableValueByTableName(Long basicId, String tableName) {
List<DataValue> dataValues = new ArrayList<>();
DataSource dataSources = dataSourceService.getById(basicId);
MysqlPool mysqlPool = new MysqlPool(dataSources);
mysqlPool.init();
Connection conn = mysqlPool.getConn();
try {
// 准备SQL查询语句
String sql = SELECTALL + tableName;
PreparedStatement preparedStatement = conn.prepareStatement(sql);
// 获取结果集的元数据,用于获取列的数量和类型等信息
ResultSetMetaData metaData = preparedStatement.getMetaData();
// 获取列的数量
int columnCount = metaData.getColumnCount();
// 使用Map存储列的元数据包括备注信息
Map<String, ColumnMeta> columnMetadata = getColumnMetadata(conn, metaData, columnCount);
// 初始化分页参数
int offset = 0;
boolean hasMorePages = true;
while (hasMorePages) {
// 构造带分页的SQL查询
String pageSql = sql + " LIMIT " + PAGE_SIZE + " OFFSET " + offset;
PreparedStatement pageStatement = conn.prepareStatement(pageSql);
ResultSet pageResultSet = pageStatement.executeQuery();
List<DataValue> pageDataValues = new ArrayList<>();
while (pageResultSet.next()) {
List<DataValue> rowValues = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
ColumnMeta columnData = columnMetadata.get(columnName);
Object value = pageResultSet.getObject(i, columnData.getType().getTargetType());
// 构建数据值对象,包含列名、备注、值、类型等信息
DataValue dataValue = DataValue.builder()
.key(columnName)
.label(columnData.getRemarks())
.value(value)
.type(columnData.getType())
.build();
rowValues.add(dataValue);
}
pageDataValues.addAll(rowValues);
}
// 每处理完一定数量的数据,提交一次
if (!pageDataValues.isEmpty()) {
CompletableFuture<Void> future = processBatchAsync(pageDataValues);
futures.add(future);
dataValues.addAll(pageDataValues);
}
// 检查是否有更多页面
hasMorePages = pageResultSet.getFetchSize() >= PAGE_SIZE;
offset += PAGE_SIZE;
}
// 确保所有异步任务完成后再关闭线程池
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} catch (SQLException e) {
// 如果发生SQL异常抛出运行时异常
throw new RuntimeException(e);
} finally {
// 在所有异步任务完成后关闭线程池
// shutdownExecutorService(executorService);
// 释放数据库连接
mysqlPool.replease(conn);
// 关闭数据库连接
mysqlPool.closeConn();
}
// 返回包含数据值的列表
return dataValues;
}
// private void shutdownExecutorService(ExecutorService executorService) {
// if (!isShuttingDown.compareAndSet(false, true)) {
// // 已经在关闭过程中,直接返回
// return;
// }
//
// // 记录开始关闭线程池的时间点
// System.out.println("开始关闭线程池:" + LocalDateTime.now());
//
// executorService.shutdown();
// try {
// if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
// executorService.shutdownNow(); // 取消正在执行的任务
// if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
// System.err.println("线程池未正常关闭!");
// }
// }
// } catch (InterruptedException ex) {
// executorService.shutdownNow(); // (Re-)Cancel if current thread also interrupted
// Thread.currentThread().interrupt(); // Preserve interrupt status
// }
//
// // 记录线程池关闭完成的时间点
// System.out.println("线程池关闭完成:" + LocalDateTime.now());
// }
private CompletableFuture<Void> processBatchAsync(final List<DataValue> batch) {
// 记录任务提交的时间点
System.out.println("任务提交:" + LocalDateTime.now());
return CompletableFuture.runAsync(() -> {
try {
// 具体的批量处理逻辑,例如写入文件、更新数据库等
dataValueService.saveBatch(batch);
} catch (Exception e) {
// 异步处理中的异常处理
e.printStackTrace();
}
// 记录任务完成的时间点
System.out.println("任务完成:" + LocalDateTime.now());
}, executorService);
}
private Map<String, ColumnMeta> getColumnMetadata(Connection conn, ResultSetMetaData metaData, int columnCount) throws SQLException {
Map<String, ColumnMeta> metadataMap = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String columnTypeName = metaData.getColumnTypeName(i);
// 查询数据库元数据,获取当前列的备注信息
ResultSet columns = conn.getMetaData().getColumns(null, null, metaData.getTableName(i), columnName);
String remarks = null;
while (columns.next()) {
remarks = columns.getString("REMARKS");
}
// 构建列元数据对象
ColumnMeta columnData = new ColumnMeta(columnName, remarks, DataType.findBySqlType(columnTypeName));
metadataMap.put(columnName, columnData);
}
return metadataMap;
}
private static class ColumnMeta {
private final String columnName;
private final String remarks;
private final DataType type;
public ColumnMeta(String columnName, String remarks, DataType type) {
this.columnName = columnName;
this.remarks = remarks;
this.type = type;
}
public String getColumnName() {
return columnName;
}
public String getRemarks() {
return remarks;
}
public DataType getType() {
return type;
}
}
// @Override
@ -140,89 +372,7 @@ public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue
throw new RuntimeException(e);
}
}
private static final int THREAD_POOL_SIZE = 10;
private final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
@Override
public List<DataValue> findTableValueByTableName(Long basicId, String tableName) {
MysqlQuery mySqlQuery = new MysqlQuery();
mySqlQuery.setDataSourceId(String.valueOf(basicId));
DataSource etlDataScore = dataSourceService.getById(basicId);
MysqlPool mysqlPool = new MysqlPool(etlDataScore);
mysqlPool.init();
Connection conn = mysqlPool.getConn();
List<DataValue> list = Collections.synchronizedList(new ArrayList<>());
try {
// Determine the total number of rows
int totalRows = getTotalRowCount(conn, tableName);
int rowsPerThread = totalRows / THREAD_POOL_SIZE;
List<Future<List<DataValue>>> futures = new ArrayList<>();
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
int start = i * rowsPerThread;
int end = (i == THREAD_POOL_SIZE - 1) ? totalRows : start + rowsPerThread;
futures.add(executorService.submit(() -> fetchData(conn, tableName, start, end)));
}
// Gather results from all threads
for (Future<List<DataValue>> future : futures) {
list.addAll(future.get());
}
} catch (SQLException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
mysqlPool.replease(conn);
mysqlPool.closeConn();
executorService.shutdown();
}
return list;
}
private List<DataValue> fetchData(Connection conn, String tableName, int start, int end) throws SQLException {
List<DataValue> resultList = new ArrayList<>();
String query = SELECTALL + tableName + " LIMIT " + start + ", " + (end - start);
try (PreparedStatement preparedStatement = conn.prepareStatement(query);
ResultSet resultSet = preparedStatement.executeQuery()) {
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; i <= columnCount; i++) {
String columnTypeName = metaData.getColumnTypeName(i);
DatabaseMetaData metaDataColumns = conn.getMetaData();
try (ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i))) {
String remarks = null;
while (columns.next()) {
remarks = columns.getString("REMARKS");
}
DataValue build = DataValue.builder()
.key(metaData.getColumnName(i))
.label(remarks)
.value(resultSet.getObject(i, DataType.convertType(columnTypeName)))
.type(DataType.convertTypeString(columnTypeName))
.build();
resultList.add(build);
}
}
}
}
return resultList;
}
private int getTotalRowCount(Connection conn, String tableName) throws SQLException {
String countQuery = "SELECT COUNT(*) FROM " + tableName;
try (Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(countQuery)) {
if (resultSet.next()) {
return resultSet.getInt(1);
}
}
return 0;
}
}