优化控制台日志展示,新增计时功能
parent
7be9bda298
commit
3ef21e6756
|
@ -9,10 +9,16 @@ package com.muyu.quest.manager;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static java.lang.Thread.sleep;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Author: 胡杨
|
* @Author: 胡杨
|
||||||
* @Name: TaskManager
|
* @Name: TaskManager
|
||||||
|
@ -20,7 +26,7 @@ import java.util.List;
|
||||||
* @CreatedDate: 2024/9/4 下午7:44
|
* @CreatedDate: 2024/9/4 下午7:44
|
||||||
* @FilePath: com.muyu.quest.manager
|
* @FilePath: com.muyu.quest.manager
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
public final class TaskManager {
|
public final class TaskManager {
|
||||||
// 线程池中默认线程的个数为5
|
// 线程池中默认线程的个数为5
|
||||||
private static int workerNum = 5;
|
private static int workerNum = 5;
|
||||||
|
@ -33,6 +39,9 @@ public final class TaskManager {
|
||||||
private final List<Runnable> taskQueue = new LinkedList<Runnable>();
|
private final List<Runnable> taskQueue = new LinkedList<Runnable>();
|
||||||
private static TaskManager taskManager;
|
private static TaskManager taskManager;
|
||||||
|
|
||||||
|
private long startTime;
|
||||||
|
private long endTime;
|
||||||
|
|
||||||
// 创建具有默认线程个数的线程池
|
// 创建具有默认线程个数的线程池
|
||||||
private TaskManager() {
|
private TaskManager() {
|
||||||
this(5);
|
this(5);
|
||||||
|
@ -46,6 +55,7 @@ public final class TaskManager {
|
||||||
workThrads[i] = new WorkThread();
|
workThrads[i] = new WorkThread();
|
||||||
workThrads[i].start();// 开启线程池中的线程
|
workThrads[i].start();// 开启线程池中的线程
|
||||||
}
|
}
|
||||||
|
startTime = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
// 单态模式,获得一个默认线程个数的线程池
|
// 单态模式,获得一个默认线程个数的线程池
|
||||||
|
@ -82,9 +92,9 @@ public final class TaskManager {
|
||||||
|
|
||||||
// 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
|
// 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧
|
while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧
|
||||||
try {
|
try {
|
||||||
Thread.sleep(500);
|
sleep(500);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -94,6 +104,8 @@ public final class TaskManager {
|
||||||
workThrads[i].stopWorker();
|
workThrads[i].stopWorker();
|
||||||
workThrads[i] = null;
|
workThrads[i] = null;
|
||||||
}
|
}
|
||||||
|
endTime = System.currentTimeMillis();
|
||||||
|
log.info("线程池关闭,工作时长:{}ms", endTime - startTime);
|
||||||
taskManager=null;
|
taskManager=null;
|
||||||
taskQueue.clear();// 清空任务队列
|
taskQueue.clear();// 清空任务队列
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,19 +155,18 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
||||||
// 获取查询条数
|
// 获取查询条数
|
||||||
int count = getFindCount(findSql, nodeMap);
|
int count = getFindCount(findSql, nodeMap);
|
||||||
// 划分线程 每次根据条数分配线程每次查询条数
|
// 划分线程 每次根据条数分配线程每次查询条数
|
||||||
// 1000W 5/100000 100W 8/10000 10W 8/1000
|
// 总条数 最大线程数/分配条数 >100W 5/100000 >10W 8/10000 <10W 8/1000
|
||||||
int pageSize = count / 100;
|
int pageSize = 10000;
|
||||||
int taskNum = 8;
|
int taskNum = 8;
|
||||||
if (count > 100000000){
|
if (count > 100000000){
|
||||||
pageSize = 100000;
|
pageSize = 100000;
|
||||||
taskNum = 5;
|
taskNum = 5;
|
||||||
}else if (count > 10000000){
|
|
||||||
pageSize = 10000;
|
|
||||||
}else if (count < 100000){
|
}else if (count < 100000){
|
||||||
pageSize = 1000;
|
pageSize = 1000;
|
||||||
}
|
}
|
||||||
int threadNum = count / pageSize + 1;
|
int threadNum = count / pageSize + 1;
|
||||||
log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum);
|
log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum);
|
||||||
|
// 如果线程池未创建 => 创建, 如果已完成内部任务 => 清理后创建新线程池
|
||||||
if (taskManager == null){
|
if (taskManager == null){
|
||||||
taskManager = TaskManager.getTaskManager(taskNum);
|
taskManager = TaskManager.getTaskManager(taskNum);
|
||||||
}else if (taskManager.getIsRunning()){
|
}else if (taskManager.getIsRunning()){
|
||||||
|
@ -187,7 +186,7 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
||||||
TaskExport entity = new TaskExport(taskCode,exportCode, addSql.substring(0,addSqlMaxLength), 0, "");
|
TaskExport entity = new TaskExport(taskCode,exportCode, addSql.substring(0,addSqlMaxLength), 0, "");
|
||||||
taskExportService.save(entity);
|
taskExportService.save(entity);
|
||||||
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
|
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
|
||||||
log.info("任务 {} 第 {} 线程执行结果 {}",taskCode,index,addResult);
|
log.info("任务 {} 第 {} 线程执行结果 {}",taskCode,index,addResult.getMsg());
|
||||||
if (addResult.getCode() != 200){
|
if (addResult.getCode() != 200){
|
||||||
int errorMaxLength = Math.min(addResult.getMsg().length(), 30000);
|
int errorMaxLength = Math.min(addResult.getMsg().length(), 30000);
|
||||||
entity.setError(addResult.getMsg().substring(0,errorMaxLength));
|
entity.setError(addResult.getMsg().substring(0,errorMaxLength));
|
||||||
|
@ -198,7 +197,7 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
||||||
taskExportService.updateByExportCode(entity);
|
taskExportService.updateByExportCode(entity);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
taskManager.destroy();
|
||||||
return "执行成功";
|
return "执行成功";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,6 +246,7 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
||||||
List<NodeDisposition> dispList = getNodeDisp(uniteNode);
|
List<NodeDisposition> dispList = getNodeDisp(uniteNode);
|
||||||
findSql = NodeUtils.nodeDispUnite(dispList);
|
findSql = NodeUtils.nodeDispUnite(dispList);
|
||||||
}
|
}
|
||||||
|
log.info("拼接查询SQL: {}",findSql);
|
||||||
return findSql;
|
return findSql;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -316,7 +316,6 @@ public class NodeUtils {
|
||||||
});
|
});
|
||||||
dataList1.add(dataMap);
|
dataList1.add(dataMap);
|
||||||
}
|
}
|
||||||
System.out.println(dataList1);
|
|
||||||
// 拼接新增语句的值
|
// 拼接新增语句的值
|
||||||
dataList1.forEach(map -> insSql.append("( ").
|
dataList1.forEach(map -> insSql.append("( ").
|
||||||
append(StringUtils.join(
|
append(StringUtils.join(
|
||||||
|
|
Loading…
Reference in New Issue