测试线程池

master
陈思豪 2024-09-09 02:15:40 +08:00
parent 565ead0b6f
commit b384edceba
2 changed files with 310 additions and 150 deletions

View File

@ -193,11 +193,9 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implement
String limit = " limit " + pageNum + "," + pageSize; String limit = " limit " + pageNum + "," + pageSize;
String limitSelect = sql + limit; String limitSelect = sql + limit;
Long finalDatabaseId = databaseId; Long finalDatabaseId = databaseId;
String finalSql = sql;
log.info("执行查询语句为{}", limitSelect); log.info("执行查询语句为{}", limitSelect);
if(taskInfo.getWeigh() == 4){ if(taskInfo.getWeigh() == 4){
log.info("执行紧急任务"); log.info("执行紧急任务");
log.info("sql为{}",finalSql);
executeUrgently(() -> { executeUrgently(() -> {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
}); });
@ -205,7 +203,6 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implement
if(taskInfo.getWeigh() == 3){ if(taskInfo.getWeigh() == 3){
log.info("执行高级任务"); log.info("执行高级任务");
log.info("sql为{}",finalSql);
executeHigh(() -> { executeHigh(() -> {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
}); });
@ -213,7 +210,6 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implement
if(taskInfo.getWeigh() == 2){ if(taskInfo.getWeigh() == 2){
log.info("执行中级任务"); log.info("执行中级任务");
log.info("sql为{}",finalSql);
executeMedium(() -> { executeMedium(() -> {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
}); });
@ -221,7 +217,6 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implement
if(taskInfo.getWeigh() == 1){ if(taskInfo.getWeigh() == 1){
log.info("执行低级任务"); log.info("执行低级任务");
log.info("sql为{}",finalSql);
executeLow(() -> { executeLow(() -> {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
}); });

View File

@ -15,178 +15,343 @@ import java.util.concurrent.atomic.AtomicInteger;
@Log4j2 @Log4j2
public class PriorityThreadPool { public class PriorityThreadPool {
//默认线程数 // //默认线程数
private static final int TOTAL_THREADS = 20; //总 // private static final int TOTAL_THREADS = 20; //总
private static final int HIGH_THREADS = 10; //高 // private static final int HIGH_THREADS = 10; //高
private static final int MEDIUM_THREADS = 6; //中 // private static final int MEDIUM_THREADS = 6; //中
private static final int LOW_THREADS = 4; //低 // private static final int LOW_THREADS = 4; //低
//
//紧急任务分重新配的线程数 // //紧急任务分重新配的线程数
private static final int EMERGENCY_THREADS = 9; //紧急 // private static final int EMERGENCY_THREADS = 9; //紧急
private static final int EMERGENCY_HIGH_THREADS = 5; //高 // private static final int EMERGENCY_HIGH_THREADS = 5; //高
private static final int EMERGENCY_MEDIUM_THREADS = 4; //中 // private static final int EMERGENCY_MEDIUM_THREADS = 4; //中
private static final int EMERGENCY_LOW_THREADS = 2; //低 // private static final int EMERGENCY_LOW_THREADS = 2; //低
//
//
//
// private static final ExecutorService executor =
// new ThreadPoolExecutor(TOTAL_THREADS, TOTAL_THREADS, 80L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>());
//
// private static final Semaphore HIGH_PRIORITY_SEMAPHORE = new Semaphore(HIGH_THREADS);
// private static final Semaphore MEDIUM_PRIORITY_SEMAPHORE = new Semaphore(MEDIUM_THREADS);
// private static final Semaphore LOW_PRIORITY_SEMAPHORE = new Semaphore(LOW_THREADS);
// private static final Semaphore EMERGENCY_SEMAPHORE = new Semaphore(0);
//
// //执行中的紧急任务数量
// private static final AtomicInteger ACTIVE_EMERGENCY_TASKS = new AtomicInteger(0);
// //剩余未完成的
// private static final AtomicInteger REMAINING_TASKS = new AtomicInteger(0);
// //可有的紧急线程
// private static final CountDownLatch EMERGENCY_LATCH = new CountDownLatch(LOW_THREADS);
// //标识是否紧急
// private static boolean EMERGENCY = false;
//
//
// public static void executeUrgently(Runnable runnable) {
// if(ACTIVE_EMERGENCY_TASKS.incrementAndGet() == 1){
// REMAINING_TASKS.set(0);
// adjustForEmergency();
// }
// REMAINING_TASKS.incrementAndGet();
// EMERGENCY_LATCH.countDown();
// try {
// EMERGENCY_SEMAPHORE.acquire();
// Runnable task = myWorker(runnable);
// executor.submit(task);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
//
//
// public static void executeHigh(Runnable runnable) {
// try {
// // 获取一个高优先级任务许可
// log.info("使用高优先级进行查询");
// HIGH_PRIORITY_SEMAPHORE.acquire();
// // 提交任务
// Runnable task = myWorker(runnable);
// log.info("提交任务");
// executor.submit(task);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// }
//
//
// public static void executeMedium(Runnable runnable) {
// try {
// // 获取一个高优先级任务许可
// MEDIUM_PRIORITY_SEMAPHORE.acquire();
// // 提交任务
// Runnable task = myWorker(runnable);
// executor.submit(task);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// }
//
//
// public static void executeLow(Runnable runnable) {
// try {
// // 获取一个高优先级任务许可
// LOW_PRIORITY_SEMAPHORE.acquire();
// // 提交任务
// Runnable task = myWorker(runnable);
// executor.submit(task);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// }
//
// private static Runnable myWorker(Runnable task) {
// return () -> {
// try {
// log.info("执行任务");
// task.run();
// } finally {
// // 释放紧急任务许可
// EMERGENCY_SEMAPHORE.release();
// // 如果所有紧急任务已完成,则恢复默认设置
// if (REMAINING_TASKS.decrementAndGet() == 0) {
// try {
// Thread.sleep(800000);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// restoreDefaults();
// }
// }
// };
// }
//
// public static synchronized void restoreDefaults() {
// if (EMERGENCY && REMAINING_TASKS.get() == 0) {
// // 清空紧急任务的许可
// EMERGENCY_SEMAPHORE.drainPermits();
// try {
// // 重新获取之前释放的许可
// HIGH_PRIORITY_SEMAPHORE.acquire(EMERGENCY_HIGH_THREADS);
// MEDIUM_PRIORITY_SEMAPHORE.acquire(EMERGENCY_MEDIUM_THREADS);
// LOW_PRIORITY_SEMAPHORE.acquire(EMERGENCY_LOW_THREADS);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// // 重新分配高、中、低优先级的线程
// HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS);
// MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS);
// LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS);
// // 清空紧急任务的许可
// EMERGENCY_SEMAPHORE.release(0);
// // 退出紧急模式
// EMERGENCY = false;
// // 确保所有紧急任务都完成了
// try {
// EMERGENCY_LATCH.await(); // 等待所有紧急任务完成
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// System.err.println("Interrupted while waiting for emergency tasks to complete.");
// }
// }
// }
//// private final PriorityBlockingQueue<TaskServiceImpl.SegmentTask> queue;
////
//// public Worker(PriorityBlockingQueue<TaskServiceImpl.SegmentTask> queue) {
//// this.queue = queue;
//// }
////
//// public void run() {
//// while (!Thread.currentThread().isInterrupted()) {
//// try {
//// // 从队列中取出一个任务
//// TaskServiceImpl.SegmentTask task = queue.take();
//// // 执行任务
//// task.run();
//// } catch (InterruptedException e) {
//// Thread.currentThread().interrupt(); // 设置中断状态
//// break;
//// }
//// }
//// }
//
// // 调整线程分配到紧急模式
// static synchronized void adjustForEmergency() {
// if (!EMERGENCY) {
// EMERGENCY = true;
// // 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间
// HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS - EMERGENCY_HIGH_THREADS);
// MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS - EMERGENCY_MEDIUM_THREADS);
// LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS - EMERGENCY_LOW_THREADS);
// // 为紧急任务分配线程
// EMERGENCY_SEMAPHORE.release(EMERGENCY_THREADS);
// }
// }
private static final ExecutorService executor;
private static final Semaphore highPrioritySemaphore;
private static final Semaphore mediumPrioritySemaphore;
private static final Semaphore lowPrioritySemaphore;
private static final Semaphore emergencySemaphore;
private static final ExecutorService executor = private static final int totalThreads =20; // 固定线程池大小
new ThreadPoolExecutor(TOTAL_THREADS, TOTAL_THREADS, 80L, TimeUnit.MILLISECONDS, private static final int defaultHighThreads = 10;
new LinkedBlockingQueue<Runnable>()); private static final int defaultMediumThreads = 6;
private static final int defaultLowThreads = 4;
private static final Semaphore HIGH_PRIORITY_SEMAPHORE = new Semaphore(HIGH_THREADS); private static final int emergencyThreads = 9;
private static final Semaphore MEDIUM_PRIORITY_SEMAPHORE = new Semaphore(MEDIUM_THREADS); private static final int emergencyHighThreads = 3;
private static final Semaphore LOW_PRIORITY_SEMAPHORE = new Semaphore(LOW_THREADS); private static final int emergencyMediumThreads = 3;
private static final Semaphore EMERGENCY_SEMAPHORE = new Semaphore(0); private static final int emergencyLowThreads = 3;
//执行中的紧急任务数量 private static volatile boolean inEmergencyMode = false;
private static final AtomicInteger ACTIVE_EMERGENCY_TASKS = new AtomicInteger(0); public static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0); // 紧急任务计数器
//剩余未完成的 public static final AtomicInteger remainingTasks = new AtomicInteger(0); // 剩余任务计数器
private static final AtomicInteger REMAINING_TASKS = new AtomicInteger(0); private static final CountDownLatch emergencyLatch = new CountDownLatch(emergencyThreads); // 用于等待所有紧急任务完成
//可有的紧急线程
private static final CountDownLatch EMERGENCY_LATCH = new CountDownLatch(LOW_THREADS);
//标识是否紧急
private static boolean EMERGENCY = false;
static {
if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) {
throw new IllegalArgumentException("Default priority threads exceed total threads.");
}
if (emergencyThreads + emergencyHighThreads + emergencyMediumThreads + emergencyLowThreads > totalThreads) {
throw new IllegalArgumentException("Emergency priority threads exceed total threads.");
}
public static void executeUrgently(Runnable runnable) { // 创建固定大小的线程池
if(ACTIVE_EMERGENCY_TASKS.incrementAndGet() == 1){ executor = new ThreadPoolExecutor(
REMAINING_TASKS.set(0); totalThreads, totalThreads,
80L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
highPrioritySemaphore = new Semaphore(defaultHighThreads);
mediumPrioritySemaphore = new Semaphore(defaultMediumThreads);
lowPrioritySemaphore = new Semaphore(defaultLowThreads);
emergencySemaphore = new Semaphore(0);
}
public static void executeUrgently(Runnable task) {
if (activeEmergencyTasks.incrementAndGet() == 1) {
// 初始化计数器
remainingTasks.set(0);
adjustForEmergency(); adjustForEmergency();
} }
REMAINING_TASKS.incrementAndGet(); remainingTasks.incrementAndGet(); // 增加剩余任务计数
EMERGENCY_LATCH.countDown(); emergencyLatch.countDown(); // 减少计数器
try { try {
EMERGENCY_SEMAPHORE.acquire(); emergencySemaphore.acquire();
Runnable task = myWorker(runnable); executor.submit(() -> {
executor.submit(task); try {
} catch (InterruptedException e) { task.run();
throw new RuntimeException(e); } finally {
} emergencySemaphore.release();
} if (remainingTasks.decrementAndGet() == 0) {
try {
Thread.sleep(800000);
public static void executeHigh(Runnable runnable) { } catch (InterruptedException e) {
try { throw new RuntimeException(e);
// 获取一个高优先级任务许可 }
log.info("使用高优先级进行查询"); restoreDefaults();
HIGH_PRIORITY_SEMAPHORE.acquire();
// 提交任务
Runnable task = myWorker(runnable);
log.info("提交任务");
executor.submit(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeMedium(Runnable runnable) {
try {
// 获取一个高优先级任务许可
MEDIUM_PRIORITY_SEMAPHORE.acquire();
// 提交任务
Runnable task = myWorker(runnable);
executor.submit(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeLow(Runnable runnable) {
try {
// 获取一个高优先级任务许可
LOW_PRIORITY_SEMAPHORE.acquire();
// 提交任务
Runnable task = myWorker(runnable);
executor.submit(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static Runnable myWorker(Runnable task) {
return () -> {
try {
log.info("执行任务");
task.run();
} finally {
// 释放紧急任务许可
EMERGENCY_SEMAPHORE.release();
// 如果所有紧急任务已完成,则恢复默认设置
if (REMAINING_TASKS.decrementAndGet() == 0) {
try {
Thread.sleep(800000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} }
restoreDefaults();
} }
} });
}; } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeHigh(Runnable task) {
try {
highPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
highPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeMedium(Runnable task) {
try {
mediumPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
mediumPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeLow(Runnable task) {
try {
lowPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
lowPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static synchronized void adjustForEmergency() {
if (!inEmergencyMode) {
inEmergencyMode = true;
// 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间
highPrioritySemaphore.release(defaultHighThreads - emergencyHighThreads);
mediumPrioritySemaphore.release(defaultMediumThreads - emergencyMediumThreads);
lowPrioritySemaphore.release(defaultLowThreads - emergencyLowThreads);
// 为紧急任务分配线程
emergencySemaphore.release(emergencyThreads);
}
} }
public static synchronized void restoreDefaults() { public static synchronized void restoreDefaults() {
if (EMERGENCY && REMAINING_TASKS.get() == 0) { if (inEmergencyMode && remainingTasks.get() == 0) {
// 清空紧急任务的许可 // 清空紧急任务的许可
EMERGENCY_SEMAPHORE.drainPermits(); emergencySemaphore.drainPermits();
try {
// 重新获取之前释放的许可
HIGH_PRIORITY_SEMAPHORE.acquire(EMERGENCY_HIGH_THREADS);
MEDIUM_PRIORITY_SEMAPHORE.acquire(EMERGENCY_MEDIUM_THREADS);
LOW_PRIORITY_SEMAPHORE.acquire(EMERGENCY_LOW_THREADS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 重新分配高、中、低优先级的线程 // 重新分配高、中、低优先级的线程
HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS); try {
MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS); highPrioritySemaphore.acquire(emergencyHighThreads);
LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS); mediumPrioritySemaphore.acquire(emergencyMediumThreads);
lowPrioritySemaphore.acquire(emergencyLowThreads);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
highPrioritySemaphore.release(defaultHighThreads);
mediumPrioritySemaphore.release(defaultMediumThreads);
lowPrioritySemaphore.release(defaultLowThreads);
// 清空紧急任务的许可 // 清空紧急任务的许可
EMERGENCY_SEMAPHORE.release(0); emergencySemaphore.release(0);
// 退出紧急模式 // 退出紧急模式
EMERGENCY = false; inEmergencyMode = false;
// 确保所有紧急任务都完成了 // 确保所有紧急任务都完成了
try { try {
EMERGENCY_LATCH.await(); // 等待所有紧急任务完成 emergencyLatch.await(); // 等待所有紧急任务完成
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
System.err.println("Interrupted while waiting for emergency tasks to complete."); System.err.println("Interrupted while waiting for emergency tasks to complete.");
} }
} }
} }
// private final PriorityBlockingQueue<TaskServiceImpl.SegmentTask> queue;
//
// public Worker(PriorityBlockingQueue<TaskServiceImpl.SegmentTask> queue) {
// this.queue = queue;
// }
//
// public void run() {
// while (!Thread.currentThread().isInterrupted()) {
// try {
// // 从队列中取出一个任务
// TaskServiceImpl.SegmentTask task = queue.take();
// // 执行任务
// task.run();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt(); // 设置中断状态
// break;
// }
// }
// }
// 调整线程分配到紧急模式
static synchronized void adjustForEmergency() {
if (!EMERGENCY) {
EMERGENCY = true;
// 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间
HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS - EMERGENCY_HIGH_THREADS);
MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS - EMERGENCY_MEDIUM_THREADS);
LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS - EMERGENCY_LOW_THREADS);
// 为紧急任务分配线程
EMERGENCY_SEMAPHORE.release(EMERGENCY_THREADS);
}
}