调整,新增动态线程池大小,动态线程分配条数划分和线程池重建功能
parent
8d1b33f903
commit
7be9bda298
|
@ -23,7 +23,7 @@ import java.util.List;
|
||||||
|
|
||||||
public final class TaskManager {
|
public final class TaskManager {
|
||||||
// 线程池中默认线程的个数为5
|
// 线程池中默认线程的个数为5
|
||||||
private static int workerNum = 8;
|
private static int workerNum = 5;
|
||||||
// 工作线程
|
// 工作线程
|
||||||
private final WorkThread[] workThrads;
|
private final WorkThread[] workThrads;
|
||||||
// 未处理的任务
|
// 未处理的任务
|
||||||
|
@ -113,6 +113,14 @@ public final class TaskManager {
|
||||||
return taskQueue.size();
|
return taskQueue.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回线程池当前状态
|
||||||
|
* @return 如果还在工作返回false,否则返回true
|
||||||
|
*/
|
||||||
|
public Boolean getIsRunning() {
|
||||||
|
return taskQueue.isEmpty() || (taskManager.getWaitTasknumber() == 0 && taskManager.getWorkThreadNumber() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
// 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
|
// 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
||||||
@Resource
|
@Resource
|
||||||
private ITaskExportService taskExportService;
|
private ITaskExportService taskExportService;
|
||||||
|
|
||||||
private final static TaskManager taskManager = TaskManager.getTaskManager();
|
private static TaskManager taskManager = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 查询任务
|
* 查询任务
|
||||||
|
@ -154,17 +154,34 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
||||||
String findSql = getFindSql(nodeMap);
|
String findSql = getFindSql(nodeMap);
|
||||||
// 获取查询条数
|
// 获取查询条数
|
||||||
int count = getFindCount(findSql, nodeMap);
|
int count = getFindCount(findSql, nodeMap);
|
||||||
// 划分线程 每次查询100000条
|
// 划分线程 每次根据条数分配线程每次查询条数
|
||||||
int pageSize = 100000;
|
// 1000W 5/100000 100W 8/10000 10W 8/1000
|
||||||
|
int pageSize = count / 100;
|
||||||
|
int taskNum = 8;
|
||||||
|
if (count > 100000000){
|
||||||
|
pageSize = 100000;
|
||||||
|
taskNum = 5;
|
||||||
|
}else if (count > 10000000){
|
||||||
|
pageSize = 10000;
|
||||||
|
}else if (count < 100000){
|
||||||
|
pageSize = 1000;
|
||||||
|
}
|
||||||
int threadNum = count / pageSize + 1;
|
int threadNum = count / pageSize + 1;
|
||||||
log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum);
|
log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum);
|
||||||
|
if (taskManager == null){
|
||||||
|
taskManager = TaskManager.getTaskManager(taskNum);
|
||||||
|
}else if (taskManager.getIsRunning()){
|
||||||
|
taskManager.destroy();
|
||||||
|
taskManager = TaskManager.getTaskManager(taskNum);
|
||||||
|
}
|
||||||
for (int i = 0; i < threadNum; i++) {
|
for (int i = 0; i < threadNum; i++) {
|
||||||
int index = i+1;
|
int index = i+1;
|
||||||
|
int limitNum = pageSize;
|
||||||
// 添加进入任务队列
|
// 添加进入任务队列
|
||||||
taskManager.execute(() -> {
|
taskManager.execute(() -> {
|
||||||
String exportCode = UUID.randomUUID().toString().replace("-","");
|
String exportCode = UUID.randomUUID().toString().replace("-","");
|
||||||
// 获取新SQL 并执行
|
// 获取新SQL 并执行
|
||||||
String sql = findSql + " LIMIT "+pageSize+" OFFSET "+(index-1)*pageSize;
|
String sql = findSql + " LIMIT "+limitNum+" OFFSET "+(index-1)*limitNum;
|
||||||
String addSql = getAddSql(nodeMap, sql);
|
String addSql = getAddSql(nodeMap, sql);
|
||||||
int addSqlMaxLength = Math.min(addSql.length(), 30000);
|
int addSqlMaxLength = Math.min(addSql.length(), 30000);
|
||||||
TaskExport entity = new TaskExport(taskCode,exportCode, addSql.substring(0,addSqlMaxLength), 0, "");
|
TaskExport entity = new TaskExport(taskCode,exportCode, addSql.substring(0,addSqlMaxLength), 0, "");
|
||||||
|
|
Loading…
Reference in New Issue