09052016:对这个资产展示使用线程池的方法去做,第四次优化代码
parent
f6f7bbc3f9
commit
230606de1d
|
@ -11,21 +11,16 @@ import com.muyu.source.pool.MysqlPool;
|
|||
import com.muyu.source.service.DataSourceService;
|
||||
import com.muyu.source.service.DataValueService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.actuate.autoconfigure.metrics.MetricsProperties;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static com.muyu.source.pool.config.BaseConfig.SELECTALL;
|
||||
|
||||
/**
|
||||
* @author Lenovo
|
||||
* @ Tool:IntelliJ IDEA
|
||||
|
@ -37,11 +32,12 @@ import static com.muyu.source.pool.config.BaseConfig.SELECTALL;
|
|||
@Service
|
||||
public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue> implements DataValueService {
|
||||
|
||||
private static final int THREAD_POOL_SIZE = 10;
|
||||
private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
|
||||
|
||||
@Autowired
|
||||
private DataSourceService dataSourceService;
|
||||
// 每批次查询的记录数
|
||||
private static final int BATCH_SIZE = 10000;
|
||||
// 线程池大小
|
||||
private static final int NUM_THREADS = 10;
|
||||
@Override
|
||||
public List<DataValue> findTableValue(DataValueModel dataValueModel) {
|
||||
List<DataValue> dataValueList = new ArrayList<>();
|
||||
|
@ -128,53 +124,6 @@ public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue
|
|||
// mysqlPool.closeConn();
|
||||
// return list;
|
||||
// }
|
||||
//
|
||||
//@Override
|
||||
//public List<DataValue> findTableValueByTableName(Long basicId, String tableName) {
|
||||
// MysqlQuery mySqlQuery = new MysqlQuery();
|
||||
// mySqlQuery.setDataSourceId(String.valueOf(basicId));
|
||||
// DataSource etlDataScore = dataSourceService.getById(basicId);
|
||||
// MysqlPool mysqlPool = new MysqlPool(etlDataScore);
|
||||
// mysqlPool.init();
|
||||
// Connection conn = mysqlPool.getConn();
|
||||
//
|
||||
// List<DataValue> list = new ArrayList<>();
|
||||
//
|
||||
// try {
|
||||
// PreparedStatement preparedStatement = conn.prepareStatement(SELECTALL + tableName);
|
||||
// ResultSet resultSet = preparedStatement.executeQuery();
|
||||
// ResultSetMetaData metaData = resultSet.getMetaData();
|
||||
// int columnCount = metaData.getColumnCount();
|
||||
//
|
||||
// while (resultSet.next()) {
|
||||
// for (int i = 1; i <= columnCount; i++) {
|
||||
// final int index = i;
|
||||
// Future<DataValue> future = executorService.submit(() -> {
|
||||
// String columnTypeName = metaData.getColumnTypeName(index);
|
||||
// DatabaseMetaData metaDataColumns = conn.getMetaData();
|
||||
// ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(index), metaData.getColumnName(index));
|
||||
// String remarks = null;
|
||||
// while (columns.next()) {
|
||||
// remarks = columns.getString("REMARKS");
|
||||
// }
|
||||
// return DataValue.builder()
|
||||
// .key(metaData.getColumnName(index))
|
||||
// .label(remarks)
|
||||
// .value(resultSet.getObject(index, DataType.convertType(columnTypeName)))
|
||||
// .type(DataType.convertTypeString(columnTypeName))
|
||||
// .build();
|
||||
// });
|
||||
// list.add(future.get());
|
||||
// }
|
||||
// }
|
||||
// } catch (SQLException | InterruptedException | ExecutionException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// } finally {
|
||||
// mysqlPool.replease(conn);
|
||||
// mysqlPool.closeConn();
|
||||
// }
|
||||
// return list;
|
||||
//}
|
||||
|
||||
@Override
|
||||
public Integer addTableValue(DataValueModel dataValueModel) {
|
||||
|
@ -192,77 +141,94 @@ public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue
|
|||
}
|
||||
}
|
||||
@Override
|
||||
public List<DataValue> findTableValueByTableName(Long basicId, String tableName) {
|
||||
MysqlQuery mySqlQuery = new MysqlQuery();
|
||||
mySqlQuery.setDataSourceId(String.valueOf(basicId));
|
||||
public List<DataValue> findTableValueByTableName(Long basicId, String tableName) {
|
||||
DataSource etlDataScore = dataSourceService.getById(basicId);
|
||||
MysqlPool mysqlPool = new MysqlPool(etlDataScore);
|
||||
mysqlPool.init();
|
||||
Connection conn = mysqlPool.getConn();
|
||||
|
||||
List<DataValue> list = new ArrayList<>();
|
||||
ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
|
||||
int totalRecords = 0;
|
||||
try {
|
||||
PreparedStatement preparedStatement = conn.prepareStatement(SELECTALL + tableName+" limit 100");
|
||||
ResultSet resultSet = preparedStatement.executeQuery();
|
||||
ResultSetMetaData metaData = resultSet.getMetaData();
|
||||
int columnCount = metaData.getColumnCount();
|
||||
totalRecords = getTotalRecords(mysqlPool, tableName);
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
int totalBatches = (totalRecords + BATCH_SIZE - 1) / BATCH_SIZE;
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
|
||||
List<Future<List<DataValue>>> futures = new ArrayList<>();
|
||||
|
||||
// Pre-fetch column details
|
||||
Map<Integer, String> columnRemarks = new HashMap<>();
|
||||
DatabaseMetaData metaDataColumns = conn.getMetaData();
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i));
|
||||
if (columns.next()) {
|
||||
columnRemarks.put(i, columns.getString("REMARKS"));
|
||||
}
|
||||
// 创建任务
|
||||
for (int batch = 0; batch < totalBatches; batch++) {
|
||||
final int offset = batch * BATCH_SIZE;
|
||||
futures.add(executorService.submit(() -> fetchBatchData(mysqlPool, tableName, offset)));
|
||||
}
|
||||
|
||||
List<DataValue> allResults = new ArrayList<>();
|
||||
for (Future<List<DataValue>> future : futures) {
|
||||
try {
|
||||
allResults.addAll(future.get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// Collect all rows first
|
||||
List<Map<Integer, Object>> rows = new ArrayList<>();
|
||||
while (resultSet.next()) {
|
||||
Map<Integer, Object> row = new HashMap<>();
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
row.put(i, resultSet.getObject(i));
|
||||
}
|
||||
rows.add(row);
|
||||
executorService.shutdown();
|
||||
mysqlPool.closeConn();
|
||||
return allResults;
|
||||
}
|
||||
|
||||
private int getTotalRecords(MysqlPool mysqlPool, String tableName) throws SQLException {
|
||||
try (Connection conn = mysqlPool.getConn();
|
||||
Statement statement = conn.createStatement();
|
||||
ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) FROM " + tableName)) {
|
||||
if (resultSet.next()) {
|
||||
return resultSet.getInt(1);
|
||||
} else {
|
||||
throw new SQLException("Failed to get total record count");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<Future<DataValue>> futures = new ArrayList<>();
|
||||
private List<DataValue> fetchBatchData(MysqlPool mysqlPool, String tableName, int offset) {
|
||||
List<DataValue> batchResults = new ArrayList<>();
|
||||
String query = "SELECT * FROM " + tableName + " LIMIT ? OFFSET ?";
|
||||
|
||||
// Process each row in separate threads
|
||||
for (Map<Integer, Object> row : rows) {
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
final int index = i;
|
||||
final Object value = row.get(index);
|
||||
futures.add(executorService.submit(() -> {
|
||||
String columnTypeName = metaData.getColumnTypeName(index);
|
||||
return DataValue.builder()
|
||||
.key(metaData.getColumnName(index))
|
||||
.label(columnRemarks.getOrDefault(index, ""))
|
||||
.value(value)
|
||||
try (Connection conn = mysqlPool.getConn();
|
||||
PreparedStatement preparedStatement = conn.prepareStatement(query)) {
|
||||
preparedStatement.setInt(1, BATCH_SIZE);
|
||||
preparedStatement.setInt(2, offset);
|
||||
|
||||
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
||||
ResultSetMetaData metaData = resultSet.getMetaData();
|
||||
int columnCount = metaData.getColumnCount();
|
||||
|
||||
while (resultSet.next()) {
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
String columnTypeName = metaData.getColumnTypeName(i);
|
||||
|
||||
DatabaseMetaData metaDataColumns = conn.getMetaData();
|
||||
ResultSet columns = metaDataColumns.getColumns(null, null, metaData.getTableName(i), metaData.getColumnName(i));
|
||||
String remarks = null;
|
||||
while (columns.next()) {
|
||||
remarks = columns.getString("REMARKS");
|
||||
}
|
||||
|
||||
DataValue build = DataValue.builder()
|
||||
.key(metaData.getColumnName(i))
|
||||
.label(remarks)
|
||||
.value(resultSet.getObject(i, DataType.convertType(columnTypeName)))
|
||||
.type(DataType.convertTypeString(columnTypeName))
|
||||
.build();
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Collect results
|
||||
for (Future<DataValue> future : futures) {
|
||||
try {
|
||||
list.add(future.get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
batchResults.add(build);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
threadPool.shutdown();
|
||||
mysqlPool.replease(conn);
|
||||
mysqlPool.closeConn();
|
||||
}
|
||||
return list;
|
||||
return batchResults;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue