cloud-etl-task/cloud-task-server/src/main/java/com/muyu/task/PriorityThreadPool.java

365 lines
14 KiB
Java

package com.muyu.task;
import lombok.extern.log4j.Log4j2;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @PackageName:com.muyu.task
* @ClassName:PriorityQueryExecutor
* @Description:
* @author: ¥陈思豪¥
* @date: 2024/9/6 10:31
*/
@Log4j2
public class PriorityThreadPool {
// //默认线程数
// private static final int TOTAL_THREADS = 20; //总
// private static final int HIGH_THREADS = 10; //高
// private static final int MEDIUM_THREADS = 6; //中
// private static final int LOW_THREADS = 4; //低
//
// //紧急任务分重新配的线程数
// private static final int EMERGENCY_THREADS = 9; //紧急
// private static final int EMERGENCY_HIGH_THREADS = 5; //高
// private static final int EMERGENCY_MEDIUM_THREADS = 4; //中
// private static final int EMERGENCY_LOW_THREADS = 2; //低
//
//
//
// private static final ExecutorService executor =
// new ThreadPoolExecutor(TOTAL_THREADS, TOTAL_THREADS, 80L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>());
//
// private static final Semaphore HIGH_PRIORITY_SEMAPHORE = new Semaphore(HIGH_THREADS);
// private static final Semaphore MEDIUM_PRIORITY_SEMAPHORE = new Semaphore(MEDIUM_THREADS);
// private static final Semaphore LOW_PRIORITY_SEMAPHORE = new Semaphore(LOW_THREADS);
// private static final Semaphore EMERGENCY_SEMAPHORE = new Semaphore(0);
//
// //执行中的紧急任务数量
// private static final AtomicInteger ACTIVE_EMERGENCY_TASKS = new AtomicInteger(0);
// //剩余未完成的
// private static final AtomicInteger REMAINING_TASKS = new AtomicInteger(0);
// //可有的紧急线程
// private static final CountDownLatch EMERGENCY_LATCH = new CountDownLatch(LOW_THREADS);
// //标识是否紧急
// private static boolean EMERGENCY = false;
//
//
// public static void executeUrgently(Runnable runnable) {
// if(ACTIVE_EMERGENCY_TASKS.incrementAndGet() == 1){
// REMAINING_TASKS.set(0);
// adjustForEmergency();
// }
// REMAINING_TASKS.incrementAndGet();
// EMERGENCY_LATCH.countDown();
// try {
// EMERGENCY_SEMAPHORE.acquire();
// Runnable task = myWorker(runnable);
// executor.submit(task);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
//
//
// public static void executeHigh(Runnable runnable) {
// try {
// // 获取一个高优先级任务许可
// log.info("使用高优先级进行查询");
// HIGH_PRIORITY_SEMAPHORE.acquire();
// // 提交任务
// Runnable task = myWorker(runnable);
// log.info("提交任务");
// executor.submit(task);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// }
//
//
// public static void executeMedium(Runnable runnable) {
// try {
// // 获取一个高优先级任务许可
// MEDIUM_PRIORITY_SEMAPHORE.acquire();
// // 提交任务
// Runnable task = myWorker(runnable);
// executor.submit(task);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// }
//
//
// public static void executeLow(Runnable runnable) {
// try {
// // 获取一个高优先级任务许可
// LOW_PRIORITY_SEMAPHORE.acquire();
// // 提交任务
// Runnable task = myWorker(runnable);
// executor.submit(task);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// }
//
// private static Runnable myWorker(Runnable task) {
// return () -> {
// try {
// log.info("执行任务");
// task.run();
// } finally {
// // 释放紧急任务许可
// EMERGENCY_SEMAPHORE.release();
// // 如果所有紧急任务已完成,则恢复默认设置
// if (REMAINING_TASKS.decrementAndGet() == 0) {
// try {
// Thread.sleep(800000);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// restoreDefaults();
// }
// }
// };
// }
//
// public static synchronized void restoreDefaults() {
// if (EMERGENCY && REMAINING_TASKS.get() == 0) {
// // 清空紧急任务的许可
// EMERGENCY_SEMAPHORE.drainPermits();
// try {
// // 重新获取之前释放的许可
// HIGH_PRIORITY_SEMAPHORE.acquire(EMERGENCY_HIGH_THREADS);
// MEDIUM_PRIORITY_SEMAPHORE.acquire(EMERGENCY_MEDIUM_THREADS);
// LOW_PRIORITY_SEMAPHORE.acquire(EMERGENCY_LOW_THREADS);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// // 重新分配高、中、低优先级的线程
// HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS);
// MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS);
// LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS);
// // 清空紧急任务的许可
// EMERGENCY_SEMAPHORE.release(0);
// // 退出紧急模式
// EMERGENCY = false;
// // 确保所有紧急任务都完成了
// try {
// EMERGENCY_LATCH.await(); // 等待所有紧急任务完成
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// System.err.println("Interrupted while waiting for emergency tasks to complete.");
// }
// }
// }
//// private final PriorityBlockingQueue<TaskServiceImpl.SegmentTask> queue;
////
//// public Worker(PriorityBlockingQueue<TaskServiceImpl.SegmentTask> queue) {
//// this.queue = queue;
//// }
////
//// public void run() {
//// while (!Thread.currentThread().isInterrupted()) {
//// try {
//// // 从队列中取出一个任务
//// TaskServiceImpl.SegmentTask task = queue.take();
//// // 执行任务
//// task.run();
//// } catch (InterruptedException e) {
//// Thread.currentThread().interrupt(); // 设置中断状态
//// break;
//// }
//// }
//// }
//
// // 调整线程分配到紧急模式
// static synchronized void adjustForEmergency() {
// if (!EMERGENCY) {
// EMERGENCY = true;
// // 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间
// HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS - EMERGENCY_HIGH_THREADS);
// MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS - EMERGENCY_MEDIUM_THREADS);
// LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS - EMERGENCY_LOW_THREADS);
// // 为紧急任务分配线程
// EMERGENCY_SEMAPHORE.release(EMERGENCY_THREADS);
// }
// }
private static final ExecutorService executor;
private static final Semaphore highPrioritySemaphore;
private static final Semaphore mediumPrioritySemaphore;
private static final Semaphore lowPrioritySemaphore;
private static final Semaphore emergencySemaphore;
private static final int totalThreads =20; // 固定线程池大小
private static final int defaultHighThreads = 10;
private static final int defaultMediumThreads = 6;
private static final int defaultLowThreads = 4;
private static final int emergencyThreads = 9;
private static final int emergencyHighThreads = 3;
private static final int emergencyMediumThreads = 3;
private static final int emergencyLowThreads = 3;
private static volatile boolean inEmergencyMode = false;
public static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0); // 紧急任务计数器
public static final AtomicInteger remainingTasks = new AtomicInteger(0); // 剩余任务计数器
private static final CountDownLatch emergencyLatch = new CountDownLatch(emergencyThreads); // 用于等待所有紧急任务完成
static {
if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) {
throw new IllegalArgumentException("Default priority threads exceed total threads.");
}
if (emergencyThreads + emergencyHighThreads + emergencyMediumThreads + emergencyLowThreads > totalThreads) {
throw new IllegalArgumentException("Emergency priority threads exceed total threads.");
}
// 创建固定大小的线程池
executor = new ThreadPoolExecutor(
totalThreads, totalThreads,
80L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
highPrioritySemaphore = new Semaphore(defaultHighThreads);
mediumPrioritySemaphore = new Semaphore(defaultMediumThreads);
lowPrioritySemaphore = new Semaphore(defaultLowThreads);
emergencySemaphore = new Semaphore(0);
}
public static void executeUrgently(Runnable task) {
if (activeEmergencyTasks.incrementAndGet() == 1) {
// 初始化计数器
remainingTasks.set(0);
adjustForEmergency();
}
remainingTasks.incrementAndGet(); // 增加剩余任务计数
emergencyLatch.countDown(); // 减少计数器
try {
emergencySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
emergencySemaphore.release();
if (remainingTasks.decrementAndGet() == 0) {
try {
Thread.sleep(800000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
restoreDefaults();
}
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeHigh(Runnable task) {
try {
highPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
highPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeMedium(Runnable task) {
try {
mediumPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
mediumPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void executeLow(Runnable task) {
try {
lowPrioritySemaphore.acquire();
log.info("获取许可");
executor.submit(() -> {
try {
log.info("开始执行任务");
task.run();
} finally {
log.info("执行任务完毕??");
lowPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static synchronized void adjustForEmergency() {
if (!inEmergencyMode) {
inEmergencyMode = true;
// 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间
highPrioritySemaphore.release(defaultHighThreads - emergencyHighThreads);
mediumPrioritySemaphore.release(defaultMediumThreads - emergencyMediumThreads);
lowPrioritySemaphore.release(defaultLowThreads - emergencyLowThreads);
// 为紧急任务分配线程
emergencySemaphore.release(emergencyThreads);
}
}
public static synchronized void restoreDefaults() {
if (inEmergencyMode && remainingTasks.get() == 0) {
// 清空紧急任务的许可
emergencySemaphore.drainPermits();
// 重新分配高、中、低优先级的线程
try {
highPrioritySemaphore.acquire(emergencyHighThreads);
mediumPrioritySemaphore.acquire(emergencyMediumThreads);
lowPrioritySemaphore.acquire(emergencyLowThreads);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
highPrioritySemaphore.release(defaultHighThreads);
mediumPrioritySemaphore.release(defaultMediumThreads);
lowPrioritySemaphore.release(defaultLowThreads);
// 清空紧急任务的许可
emergencySemaphore.release(0);
// 退出紧急模式
inEmergencyMode = false;
// 确保所有紧急任务都完成了
try {
emergencyLatch.await(); // 等待所有紧急任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Interrupted while waiting for emergency tasks to complete.");
}
}
}
}