提升线程效率

master
面包骑士 2024-09-10 14:26:07 +08:00
parent 813880747a
commit 696f4d3a88
2 changed files with 15 additions and 18 deletions

View File

@ -157,16 +157,9 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
String findSql = getFindSql(nodeMap); String findSql = getFindSql(nodeMap);
// 获取查询条数 // 获取查询条数
int count = getFindCount(findSql, nodeMap); int count = getFindCount(findSql, nodeMap);
// 划分线程 每次根据条数分配线程每次查询条数 // 划分线程
// 总条数 最大线程数/分配条数 >100W 5/100000 >10W 8/10000 <10W 8/1000 int pageSize = 30000;
int pageSize = 10000;
int taskNum = 8; int taskNum = 8;
// if (count > 8000000){
// pageSize = 100000;
// taskNum = 5;
// }else if (count < 100000){
// pageSize = 1000;
// }
int threadNum = count / pageSize + 1; int threadNum = count / pageSize + 1;
log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum); log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum);
// 如果线程池未创建 => 创建, 如果已完成内部任务 => 清理后创建新线程池 // 如果线程池未创建 => 创建, 如果已完成内部任务 => 清理后创建新线程池
@ -178,21 +171,20 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
} }
for (int i = 0; i < threadNum; i++) { for (int i = 0; i < threadNum; i++) {
int index = i+1; int index = i+1;
int limitNum = pageSize;
// 添加进入任务队列 // 添加进入任务队列
taskManager.execute(() -> { taskManager.execute(() -> {
long startTime = System.currentTimeMillis();
String exportCode = UUID.randomUUID().toString().replace("-",""); String exportCode = UUID.randomUUID().toString().replace("-","");
// 获取新SQL 并执行 // 获取新SQL 并执行
StringBuilder newFindSql = new StringBuilder(findSql); StringBuilder newFindSql = new StringBuilder(findSql);
newFindSql.append(" LIMIT ") newFindSql.append(" LIMIT ")
.append(limitNum) .append(pageSize)
.append(" OFFSET ") .append(" OFFSET ")
.append((index-1)*limitNum); .append((index-1) * pageSize);
String addSql = getAddSql(nodeMap, newFindSql.toString()); String addSql = getAddSql(nodeMap, newFindSql.toString());
TaskExport entity = new TaskExport(taskCode,exportCode, newFindSql.toString(), 0, ""); TaskExport entity = new TaskExport(taskCode,exportCode, newFindSql.toString(), 0, "");
taskExportService.save(entity); taskExportService.save(entity);
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql)); Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
log.info("任务 {} 第 {} 线程执行结果 {}",taskCode,index,addResult.getMsg());
if (addResult.getCode() != 200){ if (addResult.getCode() != 200){
int errorMaxLength = Math.min(addResult.getMsg().length(), 30000); int errorMaxLength = Math.min(addResult.getMsg().length(), 30000);
entity.setError(addResult.getMsg().substring(0,errorMaxLength)); entity.setError(addResult.getMsg().substring(0,errorMaxLength));
@ -201,12 +193,16 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
entity.setStart(1); entity.setStart(1);
} }
taskExportService.updateByExportCode(entity); taskExportService.updateByExportCode(entity);
log.info("第 {} 线程执行结果 {},耗时: {}ms",index,addResult.getMsg(),System.currentTimeMillis()-startTime);
if (index == threadNum){
new Thread(() -> {
taskManager.closed();
taskManager = null;
}).start();
}
}); });
} }
new Thread(() -> {
taskManager.closed();
taskManager = null;
}).start();
return "任务启动成功!"; return "任务启动成功!";
} }

View File

@ -345,7 +345,8 @@ public class NodeUtils {
.replace("'","") .replace("'","")
.replace("`","") .replace("`","")
.replace(" ","") .replace(" ","")
.replace("\\","")); .replace("\\","")
);
} }
return data; return data;
} }