09081022:给任务模块提供查询和添加的接口

master
冷调 2024-09-08 10:22:59 +08:00
parent ae6729c434
commit 38cb2d157a
1 changed files with 18 additions and 4 deletions

View File

@ -351,21 +351,30 @@ public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue
MysqlQuery mysqlQuery = new MysqlQuery(); MysqlQuery mysqlQuery = new MysqlQuery();
mysqlQuery.setDataSourceId(dataValueModel.getBasicId()); mysqlQuery.setDataSourceId(dataValueModel.getBasicId());
DataSource dataSource = dataSourceService.getById(dataValueModel.getBasicId()); DataSource dataSource = dataSourceService.getById(dataValueModel.getBasicId());
MysqlPool mysqlPool = new MysqlPool(dataSource); MysqlPool mysqlPool = new MysqlPool(dataSource);
mysqlPool.init(); mysqlPool.init();
// 创建线程池
ExecutorService executorService1 = Executors.newFixedThreadPool(8); ExecutorService executorService1 = Executors.newFixedThreadPool(8);
// 用于记录批量插入的数量
AtomicInteger atomicInteger = new AtomicInteger(); AtomicInteger atomicInteger = new AtomicInteger();
// 将数据拆分成小批量
List<DataValue[][]> batches =splitData(dataValueModel.getDataValues(),2000); List<DataValue[][]> batches =splitData(dataValueModel.getDataValues(),2000);
// 获取连接
Connection conn = mysqlPool.getConn(); Connection conn = mysqlPool.getConn();
try { try {
// 关闭自动提交,改为手动提交
conn.setAutoCommit(false); conn.setAutoCommit(false);
// 提交数据
for (DataValue[][] batch : batches) { for (DataValue[][] batch : batches) {
// 异步提交数据
executorService1.submit(()->{ executorService1.submit(()->{
try { try {
// 创建语句
Statement statement = conn.createStatement(); Statement statement = conn.createStatement();
statement.executeQuery(dataValueModel.getSql()); // 执行语句
statement.executeUpdate(dataValueModel.getSql());
atomicInteger.addAndGet(batch.length); atomicInteger.addAndGet(batch.length);
} catch (SQLException e) { } catch (SQLException e) {
try { try {
@ -378,9 +387,11 @@ public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue
} }
}); });
} }
// 等待所有任务完成
executorService1.shutdown(); executorService1.shutdown();
// 等待所有任务完成
executorService1.awaitTermination(1, TimeUnit.MINUTES); executorService1.awaitTermination(1, TimeUnit.MINUTES);
// 提交事务
conn.commit(); conn.commit();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -388,10 +399,13 @@ public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
// 关闭连接
mysqlPool.closeConn(); mysqlPool.closeConn();
// 关闭连接池
mysqlPool.closeBaseConn(); mysqlPool.closeBaseConn();
// 关闭线程池
executorService1.shutdownNow();
} }
return atomicInteger.get(); return atomicInteger.get();
} }