From b384edceba60ca16ab541bbf0c527f14b97cac14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=80=9D=E8=B1=AA?= <1437200870@qq.com> Date: Mon, 9 Sep 2024 02:15:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=BA=BF=E7=A8=8B=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../muyu/service/impl/TaskServiceImpl.java | 5 - .../com/muyu/task/PriorityThreadPool.java | 455 ++++++++++++------ 2 files changed, 310 insertions(+), 150 deletions(-) diff --git a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java index 947f9f6..a1ac602 100644 --- a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java @@ -193,11 +193,9 @@ public class TaskServiceImpl extends ServiceImpl implement String limit = " limit " + pageNum + "," + pageSize; String limitSelect = sql + limit; Long finalDatabaseId = databaseId; - String finalSql = sql; log.info("执行查询语句为{}", limitSelect); if(taskInfo.getWeigh() == 4){ log.info("执行紧急任务"); - log.info("sql为{}",finalSql); executeUrgently(() -> { selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); }); @@ -205,7 +203,6 @@ public class TaskServiceImpl extends ServiceImpl implement if(taskInfo.getWeigh() == 3){ log.info("执行高级任务"); - log.info("sql为{}",finalSql); executeHigh(() -> { selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); }); @@ -213,7 +210,6 @@ public class TaskServiceImpl extends ServiceImpl implement if(taskInfo.getWeigh() == 2){ log.info("执行中级任务"); - log.info("sql为{}",finalSql); executeMedium(() -> { selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); }); @@ -221,7 +217,6 @@ public class TaskServiceImpl extends ServiceImpl implement if(taskInfo.getWeigh() == 1){ log.info("执行低级任务"); - log.info("sql为{}",finalSql); executeLow(() -> { selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); }); diff --git a/cloud-task-server/src/main/java/com/muyu/task/PriorityThreadPool.java b/cloud-task-server/src/main/java/com/muyu/task/PriorityThreadPool.java index cdedbd7..48a2ae4 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/PriorityThreadPool.java +++ b/cloud-task-server/src/main/java/com/muyu/task/PriorityThreadPool.java @@ -15,178 +15,343 @@ import java.util.concurrent.atomic.AtomicInteger; @Log4j2 public class PriorityThreadPool { - //默认线程数 - private static final int TOTAL_THREADS = 20; //总 - private static final int HIGH_THREADS = 10; //高 - private static final int MEDIUM_THREADS = 6; //中 - private static final int LOW_THREADS = 4; //低 - - //紧急任务分重新配的线程数 - private static final int EMERGENCY_THREADS = 9; //紧急 - private static final int EMERGENCY_HIGH_THREADS = 5; //高 - private static final int EMERGENCY_MEDIUM_THREADS = 4; //中 - private static final int EMERGENCY_LOW_THREADS = 2; //低 +// //默认线程数 +// private static final int TOTAL_THREADS = 20; //总 +// private static final int HIGH_THREADS = 10; //高 +// private static final int MEDIUM_THREADS = 6; //中 +// private static final int LOW_THREADS = 4; //低 +// +// //紧急任务分重新配的线程数 +// private static final int EMERGENCY_THREADS = 9; //紧急 +// private static final int EMERGENCY_HIGH_THREADS = 5; //高 +// private static final int EMERGENCY_MEDIUM_THREADS = 4; //中 +// private static final int EMERGENCY_LOW_THREADS = 2; //低 +// +// +// +// private static final ExecutorService executor = +// new ThreadPoolExecutor(TOTAL_THREADS, TOTAL_THREADS, 80L, TimeUnit.MILLISECONDS, +// new LinkedBlockingQueue()); +// +// private static final Semaphore HIGH_PRIORITY_SEMAPHORE = new Semaphore(HIGH_THREADS); +// private static final Semaphore MEDIUM_PRIORITY_SEMAPHORE = new Semaphore(MEDIUM_THREADS); +// private static final Semaphore LOW_PRIORITY_SEMAPHORE = new Semaphore(LOW_THREADS); +// private static final Semaphore EMERGENCY_SEMAPHORE = new Semaphore(0); +// +// //执行中的紧急任务数量 +// private static final AtomicInteger ACTIVE_EMERGENCY_TASKS = new AtomicInteger(0); +// //剩余未完成的 +// private static final AtomicInteger REMAINING_TASKS = new AtomicInteger(0); +// //可有的紧急线程 +// private static final CountDownLatch EMERGENCY_LATCH = new CountDownLatch(LOW_THREADS); +// //标识是否紧急 +// private static boolean EMERGENCY = false; +// +// +// public static void executeUrgently(Runnable runnable) { +// if(ACTIVE_EMERGENCY_TASKS.incrementAndGet() == 1){ +// REMAINING_TASKS.set(0); +// adjustForEmergency(); +// } +// REMAINING_TASKS.incrementAndGet(); +// EMERGENCY_LATCH.countDown(); +// try { +// EMERGENCY_SEMAPHORE.acquire(); +// Runnable task = myWorker(runnable); +// executor.submit(task); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } +// +// +// public static void executeHigh(Runnable runnable) { +// try { +// // 获取一个高优先级任务许可 +// log.info("使用高优先级进行查询"); +// HIGH_PRIORITY_SEMAPHORE.acquire(); +// // 提交任务 +// Runnable task = myWorker(runnable); +// log.info("提交任务"); +// executor.submit(task); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } +// } +// +// +// public static void executeMedium(Runnable runnable) { +// try { +// // 获取一个高优先级任务许可 +// MEDIUM_PRIORITY_SEMAPHORE.acquire(); +// // 提交任务 +// Runnable task = myWorker(runnable); +// executor.submit(task); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } +// } +// +// +// public static void executeLow(Runnable runnable) { +// try { +// // 获取一个高优先级任务许可 +// LOW_PRIORITY_SEMAPHORE.acquire(); +// // 提交任务 +// Runnable task = myWorker(runnable); +// executor.submit(task); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } +// } +// +// private static Runnable myWorker(Runnable task) { +// return () -> { +// try { +// log.info("执行任务"); +// task.run(); +// } finally { +// // 释放紧急任务许可 +// EMERGENCY_SEMAPHORE.release(); +// // 如果所有紧急任务已完成,则恢复默认设置 +// if (REMAINING_TASKS.decrementAndGet() == 0) { +// try { +// Thread.sleep(800000); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } +// restoreDefaults(); +// } +// } +// }; +// } +// +// public static synchronized void restoreDefaults() { +// if (EMERGENCY && REMAINING_TASKS.get() == 0) { +// // 清空紧急任务的许可 +// EMERGENCY_SEMAPHORE.drainPermits(); +// try { +// // 重新获取之前释放的许可 +// HIGH_PRIORITY_SEMAPHORE.acquire(EMERGENCY_HIGH_THREADS); +// MEDIUM_PRIORITY_SEMAPHORE.acquire(EMERGENCY_MEDIUM_THREADS); +// LOW_PRIORITY_SEMAPHORE.acquire(EMERGENCY_LOW_THREADS); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } +// // 重新分配高、中、低优先级的线程 +// HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS); +// MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS); +// LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS); +// // 清空紧急任务的许可 +// EMERGENCY_SEMAPHORE.release(0); +// // 退出紧急模式 +// EMERGENCY = false; +// // 确保所有紧急任务都完成了 +// try { +// EMERGENCY_LATCH.await(); // 等待所有紧急任务完成 +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// System.err.println("Interrupted while waiting for emergency tasks to complete."); +// } +// } +// } +//// private final PriorityBlockingQueue queue; +//// +//// public Worker(PriorityBlockingQueue queue) { +//// this.queue = queue; +//// } +//// +//// public void run() { +//// while (!Thread.currentThread().isInterrupted()) { +//// try { +//// // 从队列中取出一个任务 +//// TaskServiceImpl.SegmentTask task = queue.take(); +//// // 执行任务 +//// task.run(); +//// } catch (InterruptedException e) { +//// Thread.currentThread().interrupt(); // 设置中断状态 +//// break; +//// } +//// } +//// } +// +// // 调整线程分配到紧急模式 +// static synchronized void adjustForEmergency() { +// if (!EMERGENCY) { +// EMERGENCY = true; +// // 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间 +// HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS - EMERGENCY_HIGH_THREADS); +// MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS - EMERGENCY_MEDIUM_THREADS); +// LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS - EMERGENCY_LOW_THREADS); +// // 为紧急任务分配线程 +// EMERGENCY_SEMAPHORE.release(EMERGENCY_THREADS); +// } +// } + private static final ExecutorService executor; + private static final Semaphore highPrioritySemaphore; + private static final Semaphore mediumPrioritySemaphore; + private static final Semaphore lowPrioritySemaphore; + private static final Semaphore emergencySemaphore; - private static final ExecutorService executor = - new ThreadPoolExecutor(TOTAL_THREADS, TOTAL_THREADS, 80L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue()); + private static final int totalThreads =20; // 固定线程池大小 + private static final int defaultHighThreads = 10; + private static final int defaultMediumThreads = 6; + private static final int defaultLowThreads = 4; - private static final Semaphore HIGH_PRIORITY_SEMAPHORE = new Semaphore(HIGH_THREADS); - private static final Semaphore MEDIUM_PRIORITY_SEMAPHORE = new Semaphore(MEDIUM_THREADS); - private static final Semaphore LOW_PRIORITY_SEMAPHORE = new Semaphore(LOW_THREADS); - private static final Semaphore EMERGENCY_SEMAPHORE = new Semaphore(0); + private static final int emergencyThreads = 9; + private static final int emergencyHighThreads = 3; + private static final int emergencyMediumThreads = 3; + private static final int emergencyLowThreads = 3; - //执行中的紧急任务数量 - private static final AtomicInteger ACTIVE_EMERGENCY_TASKS = new AtomicInteger(0); - //剩余未完成的 - private static final AtomicInteger REMAINING_TASKS = new AtomicInteger(0); - //可有的紧急线程 - private static final CountDownLatch EMERGENCY_LATCH = new CountDownLatch(LOW_THREADS); - //标识是否紧急 - private static boolean EMERGENCY = false; + private static volatile boolean inEmergencyMode = false; + public static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0); // 紧急任务计数器 + public static final AtomicInteger remainingTasks = new AtomicInteger(0); // 剩余任务计数器 + private static final CountDownLatch emergencyLatch = new CountDownLatch(emergencyThreads); // 用于等待所有紧急任务完成 + static { + if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) { + throw new IllegalArgumentException("Default priority threads exceed total threads."); + } + if (emergencyThreads + emergencyHighThreads + emergencyMediumThreads + emergencyLowThreads > totalThreads) { + throw new IllegalArgumentException("Emergency priority threads exceed total threads."); + } - public static void executeUrgently(Runnable runnable) { - if(ACTIVE_EMERGENCY_TASKS.incrementAndGet() == 1){ - REMAINING_TASKS.set(0); + // 创建固定大小的线程池 + executor = new ThreadPoolExecutor( + totalThreads, totalThreads, + 80L, TimeUnit.SECONDS, + new LinkedBlockingQueue() + ); + highPrioritySemaphore = new Semaphore(defaultHighThreads); + mediumPrioritySemaphore = new Semaphore(defaultMediumThreads); + lowPrioritySemaphore = new Semaphore(defaultLowThreads); + emergencySemaphore = new Semaphore(0); + } + + public static void executeUrgently(Runnable task) { + if (activeEmergencyTasks.incrementAndGet() == 1) { + // 初始化计数器 + remainingTasks.set(0); adjustForEmergency(); } - REMAINING_TASKS.incrementAndGet(); - EMERGENCY_LATCH.countDown(); + remainingTasks.incrementAndGet(); // 增加剩余任务计数 + emergencyLatch.countDown(); // 减少计数器 try { - EMERGENCY_SEMAPHORE.acquire(); - Runnable task = myWorker(runnable); - executor.submit(task); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - - public static void executeHigh(Runnable runnable) { - try { - // 获取一个高优先级任务许可 - log.info("使用高优先级进行查询"); - HIGH_PRIORITY_SEMAPHORE.acquire(); - // 提交任务 - Runnable task = myWorker(runnable); - log.info("提交任务"); - executor.submit(task); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - - public static void executeMedium(Runnable runnable) { - try { - // 获取一个高优先级任务许可 - MEDIUM_PRIORITY_SEMAPHORE.acquire(); - // 提交任务 - Runnable task = myWorker(runnable); - executor.submit(task); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - - public static void executeLow(Runnable runnable) { - try { - // 获取一个高优先级任务许可 - LOW_PRIORITY_SEMAPHORE.acquire(); - // 提交任务 - Runnable task = myWorker(runnable); - executor.submit(task); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private static Runnable myWorker(Runnable task) { - return () -> { - try { - log.info("执行任务"); - task.run(); - } finally { - // 释放紧急任务许可 - EMERGENCY_SEMAPHORE.release(); - // 如果所有紧急任务已完成,则恢复默认设置 - if (REMAINING_TASKS.decrementAndGet() == 0) { - try { - Thread.sleep(800000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + emergencySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + emergencySemaphore.release(); + if (remainingTasks.decrementAndGet() == 0) { + try { + Thread.sleep(800000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + restoreDefaults(); } - restoreDefaults(); } - } - }; + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public static void executeHigh(Runnable task) { + try { + highPrioritySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + highPrioritySemaphore.release(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public static void executeMedium(Runnable task) { + try { + mediumPrioritySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + mediumPrioritySemaphore.release(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public static void executeLow(Runnable task) { + try { + lowPrioritySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + lowPrioritySemaphore.release(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public static synchronized void adjustForEmergency() { + if (!inEmergencyMode) { + inEmergencyMode = true; + + // 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间 + highPrioritySemaphore.release(defaultHighThreads - emergencyHighThreads); + mediumPrioritySemaphore.release(defaultMediumThreads - emergencyMediumThreads); + lowPrioritySemaphore.release(defaultLowThreads - emergencyLowThreads); + + // 为紧急任务分配线程 + emergencySemaphore.release(emergencyThreads); + } } public static synchronized void restoreDefaults() { - if (EMERGENCY && REMAINING_TASKS.get() == 0) { + if (inEmergencyMode && remainingTasks.get() == 0) { // 清空紧急任务的许可 - EMERGENCY_SEMAPHORE.drainPermits(); - try { - // 重新获取之前释放的许可 - HIGH_PRIORITY_SEMAPHORE.acquire(EMERGENCY_HIGH_THREADS); - MEDIUM_PRIORITY_SEMAPHORE.acquire(EMERGENCY_MEDIUM_THREADS); - LOW_PRIORITY_SEMAPHORE.acquire(EMERGENCY_LOW_THREADS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + emergencySemaphore.drainPermits(); + // 重新分配高、中、低优先级的线程 - HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS); - MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS); - LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS); + try { + highPrioritySemaphore.acquire(emergencyHighThreads); + mediumPrioritySemaphore.acquire(emergencyMediumThreads); + lowPrioritySemaphore.acquire(emergencyLowThreads); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + highPrioritySemaphore.release(defaultHighThreads); + mediumPrioritySemaphore.release(defaultMediumThreads); + lowPrioritySemaphore.release(defaultLowThreads); + // 清空紧急任务的许可 - EMERGENCY_SEMAPHORE.release(0); + emergencySemaphore.release(0); + // 退出紧急模式 - EMERGENCY = false; + inEmergencyMode = false; + // 确保所有紧急任务都完成了 try { - EMERGENCY_LATCH.await(); // 等待所有紧急任务完成 + emergencyLatch.await(); // 等待所有紧急任务完成 } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("Interrupted while waiting for emergency tasks to complete."); } } } -// private final PriorityBlockingQueue queue; -// -// public Worker(PriorityBlockingQueue queue) { -// this.queue = queue; -// } -// -// public void run() { -// while (!Thread.currentThread().isInterrupted()) { -// try { -// // 从队列中取出一个任务 -// TaskServiceImpl.SegmentTask task = queue.take(); -// // 执行任务 -// task.run(); -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); // 设置中断状态 -// break; -// } -// } -// } - - // 调整线程分配到紧急模式 - static synchronized void adjustForEmergency() { - if (!EMERGENCY) { - EMERGENCY = true; - // 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间 - HIGH_PRIORITY_SEMAPHORE.release(HIGH_THREADS - EMERGENCY_HIGH_THREADS); - MEDIUM_PRIORITY_SEMAPHORE.release(MEDIUM_THREADS - EMERGENCY_MEDIUM_THREADS); - LOW_PRIORITY_SEMAPHORE.release(LOW_THREADS - EMERGENCY_LOW_THREADS); - // 为紧急任务分配线程 - EMERGENCY_SEMAPHORE.release(EMERGENCY_THREADS); - } - }