From 31c589da467eaf58bd4d30c1c9d49c0e06b0518d Mon Sep 17 00:00:00 2001 From: Cui YongXing <2835316714@qq.com> Date: Wed, 4 Sep 2024 15:48:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=88=86=E9=A1=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/TaskInfoServiceImpl.java | 14 +- .../server/thread/PrioritizedThreadPool.java | 142 ++++++++++++++++++ 2 files changed, 150 insertions(+), 6 deletions(-) create mode 100644 cloud-task-server/src/main/java/com/muyu/task/server/thread/PrioritizedThreadPool.java diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index af5c739..1ece54a 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -221,6 +221,9 @@ public class TaskInfoServiceImpl extends ServiceImpl i QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("task_id", taskId); TaskOutput one = taskOutputService.getOne(queryWrapper); + if (one==null){ + return "没有选择输出"; + } Long tableId = one.getTableId(); Long newBasicId = one.getBasicId(); HashMap map = new HashMap<>(); @@ -235,14 +238,11 @@ public class TaskInfoServiceImpl extends ServiceImpl i String sql = " SELECT count(1) FROM " + joint; Result countResult = datasourceFeign.findCount(basicId, sql); Long data = countResult.getData(); - System.out.println("======="+data+"======="); String finalFieName = fieName; String finalJoint = joint; - System.out.println(weight); - System.out.println(Weight.centre.getValue()); long count = data/10000==0?1:data/10000; if (Weight.high.getValue().equals(weight)){ - + log.info("执行高级任务"); for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * 10000; submitHighPriorityTask(()->{ @@ -251,6 +251,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i } } if (Weight.centre.getValue().equals(weight)){ + log.info("执行中级任务"); for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * 10000; System.out.println(pageNum); @@ -260,6 +261,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i } } if (Weight.low.getValue().equals(weight)){ + log.info("执行低级任务"); for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * 10000; submitLowPriorityTask(()->{ @@ -268,6 +270,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i } } if (Weight.urgency.getValue().equals(weight)){ + log.info("执行紧急任务"); for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * 10000; submitEmergencyTask(()->{ @@ -281,8 +284,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i @NotNull private void getString(Long pageNum,String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap map ) { - String sql = " SELECT " + fieName + " FROM " + joint; - System.out.println(sql); + String sql = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +",10000 "; Result>> tableValueResult = datasourceFeign.findTableValue(basicId, sql); List> tableValue = tableValueResult.getData(); for (List dataValues : tableValue) { diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/thread/PrioritizedThreadPool.java b/cloud-task-server/src/main/java/com/muyu/task/server/thread/PrioritizedThreadPool.java new file mode 100644 index 0000000..7eb5eef --- /dev/null +++ b/cloud-task-server/src/main/java/com/muyu/task/server/thread/PrioritizedThreadPool.java @@ -0,0 +1,142 @@ +package com.muyu.task.server.thread; + +import java.util.concurrent.*; + +public class PrioritizedThreadPool { + private static final ExecutorService executor; + private static final Semaphore highSemaphore; + private static final Semaphore lowSemaphore; + private static final Semaphore centerSemaphore; + private static final Semaphore emergencySemaphore; + + private static final int totalThreads = 30; + private static final int highThreads = 16; + private static final int centerThreads = 9; + private static final int lowThreads = 5; + + private static final int emergencyTaskThreads = 14; + private static final int emergencyHighThreads = 9; + private static final int emergencyCenterThreads = 5; + private static final int emergencyLowThreads = 2; + + + private static volatile boolean inEmergencyMode = false; + + static { + if (highThreads + centerThreads + lowThreads > totalThreads) { + throw new IllegalArgumentException("总和大于线程数"); + } + if (emergencyCenterThreads + emergencyHighThreads + emergencyLowThreads > totalThreads) { + throw new IllegalArgumentException("总和大于线程数"); + } + executor =new ThreadPoolExecutor(totalThreads, totalThreads,60L,TimeUnit.SECONDS,new LinkedBlockingQueue()); + highSemaphore = new Semaphore(highThreads); + centerSemaphore = new Semaphore(centerThreads); + lowSemaphore = new Semaphore(lowThreads); + emergencySemaphore = new Semaphore(0); + + } + public static void submit(Runnable task) { + adjustForEmergency(); + try { + emergencySemaphore.acquire(); + executor.submit(()->{ + try { + task.run(); + }finally { + emergencySemaphore.release(); + restoreDefaults(); + } + }); + }catch (InterruptedException e){ + Thread.currentThread().interrupt(); + } + } + private static void subHighTask(Runnable task){ + try { + highSemaphore.acquire(); + executor.submit(()->{ + try { + task.run(); + }finally { + highSemaphore.release(); + } + }); + }catch (InterruptedException e){ + Thread.currentThread().interrupt(); + } + } + private static void subcenterTask(Runnable task){ + try { + centerSemaphore.acquire(); + executor.submit(()->{ + try { + task.run(); + }finally { + centerSemaphore.release(); + } + }); + }catch (InterruptedException e){ + Thread.currentThread().interrupt(); + } + } + private static void subLowTask(Runnable task){ + try { + lowSemaphore.acquire(); + executor.submit(()->{ + try { + task.run(); + }finally { + lowSemaphore.release(); + } + }); + }catch (InterruptedException e){ + Thread.currentThread().interrupt(); + } + } + + private static synchronized void adjustForEmergency(){ + if (!inEmergencyMode) { + inEmergencyMode = true; + + highSemaphore.release(highThreads-emergencyHighThreads); + centerSemaphore.release(centerThreads-emergencyCenterThreads); + lowSemaphore.release(lowThreads-emergencyLowThreads); + emergencySemaphore.release(emergencyTaskThreads); + } + } + + private static synchronized void restoreDefaults(){ + if (inEmergencyMode){ + emergencySemaphore.drainPermits(); + try { + highSemaphore.acquire(emergencyHighThreads); + centerSemaphore.acquire(emergencyCenterThreads); + lowSemaphore.acquire(emergencyLowThreads); + }catch (InterruptedException e){ + throw new RuntimeException(e); + } + } + highSemaphore.release(highThreads); + centerSemaphore.release(centerThreads); + lowSemaphore.release(lowThreads); + + emergencySemaphore.release(-emergencyTaskThreads); + + inEmergencyMode = false; + } + + public static void shutdown() throws InterruptedException { + executor.shutdown(); + if (!executor.awaitTermination(1,TimeUnit.HOURS)){ + executor.shutdownNow(); + if (!executor.awaitTermination(1,TimeUnit.MINUTES)){ + System.err.println("Executor did not terminate"); + } + } + } + + + + +}