From 1b92668176bb2a9e780830321bacb41c337a485f Mon Sep 17 00:00:00 2001 From: Cui YongXing <2835316714@qq.com> Date: Thu, 5 Sep 2024 22:00:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=BB=E6=89=BE=E6=8A=A5=E9=94=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/service/impl/TaskInfoServiceImpl.java | 15 +++++++++++---- .../thread/OptimizedPrioritizedThreadPool.java | 14 ++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index 1c552c0..d91f6f3 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -1,5 +1,6 @@ package com.muyu.task.server.service.impl; +import com.muyu.task.server.thread.OptimizedPrioritizedThreadPool; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,7 +28,6 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -277,12 +277,19 @@ public class TaskInfoServiceImpl extends ServiceImpl i if (Weight.urgency.getValue().equals(weight)){ log.info("执行紧急任务"); // 调整线程分配以适应紧急任务 - + OptimizedPrioritizedThreadPool.activeEmergencyTasks.set(0); + OptimizedPrioritizedThreadPool.remainingTasks.set(0); for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * PAGE_SIZE; - submitEmergencyTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map,num); + try { + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, num); + } finally { + // 减少剩余任务计数 + if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) { + System.out.println("All emergency tasks have completed."); + } + } }); } diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java index 7f14f42..95e80d0 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java @@ -1,5 +1,7 @@ package com.muyu.task.server.thread; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class OptimizedPrioritizedThreadPool { @@ -21,7 +23,8 @@ public class OptimizedPrioritizedThreadPool { private static final int emergencyLowThreads = 5; private static volatile boolean inEmergencyMode = false; - private static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0); // 紧急任务计数器 + public static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0); // 紧急任务计数器 + public static final AtomicInteger remainingTasks = new AtomicInteger(0); // 剩余任务计数器 static { if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) { @@ -45,8 +48,11 @@ public class OptimizedPrioritizedThreadPool { public static void submitEmergencyTask(Runnable task) { if (activeEmergencyTasks.incrementAndGet() == 1) { + // 初始化计数器 + remainingTasks.set(0); adjustForEmergency(); } + remainingTasks.incrementAndGet(); // 增加剩余任务计数 try { emergencySemaphore.acquire(); executor.submit(() -> { @@ -54,7 +60,7 @@ public class OptimizedPrioritizedThreadPool { task.run(); } finally { emergencySemaphore.release(); - if (activeEmergencyTasks.decrementAndGet() == 0) { + if (remainingTasks.decrementAndGet() == 0) { restoreDefaults(); } } @@ -127,8 +133,8 @@ public class OptimizedPrioritizedThreadPool { } } - public static synchronized void restoreDefaults() { - if (inEmergencyMode) { + public static synchronized void restoreDefaults() { + if (inEmergencyMode && remainingTasks.get() == 0) { // 清空紧急任务的许可 emergencySemaphore.drainPermits();