diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index aba344a..2074b83 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -167,7 +167,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i HashSet longs = new HashSet<>(); List list = taskInputService.list(wrapper); if (CollectionUtils.isEmpty(list)) { - return "没有选择字段"; + return "没有选择表"; } HashMap tableNameMap = new HashMap<>(); String fieName = ""; @@ -285,11 +285,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map,num); }); } - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + } long end = System.currentTimeMillis(); @@ -302,6 +298,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i private void getString(Long pageNum,String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap map ,Integer num) { String sqlSelect = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +","+PAGE_SIZE; log.info(sqlSelect); + Result tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect); DataValue[][] data = tableValueResult.getData(); for (DataValue[] datum : data) { diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java index 8311021..7f14f42 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java @@ -2,8 +2,8 @@ package com.muyu.task.server.thread; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; - public class OptimizedPrioritizedThreadPool { + private static final ExecutorService executor; private static final Semaphore highPrioritySemaphore; private static final Semaphore mediumPrioritySemaphore; @@ -21,7 +21,8 @@ public class OptimizedPrioritizedThreadPool { private static final int emergencyLowThreads = 5; private static volatile boolean inEmergencyMode = false; - private static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0);// 紧急任务计数器 + private static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0); // 紧急任务计数器 + static { if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) { throw new IllegalArgumentException("Default priority threads exceed total threads."); @@ -34,7 +35,7 @@ public class OptimizedPrioritizedThreadPool { executor = new ThreadPoolExecutor( totalThreads, totalThreads, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue() // 使用无界的阻塞队列 + new LinkedBlockingQueue() ); highPrioritySemaphore = new Semaphore(defaultHighThreads); mediumPrioritySemaphore = new Semaphore(defaultMediumThreads); @@ -43,7 +44,7 @@ public class OptimizedPrioritizedThreadPool { } public static void submitEmergencyTask(Runnable task) { - if (activeEmergencyTasks.incrementAndGet()==1){ + if (activeEmergencyTasks.incrementAndGet() == 1) { adjustForEmergency(); } try { @@ -53,14 +54,13 @@ public class OptimizedPrioritizedThreadPool { task.run(); } finally { emergencySemaphore.release(); - if (activeEmergencyTasks.decrementAndGet()==0){ - restoreDefaults(); + if (activeEmergencyTasks.decrementAndGet() == 0) { + restoreDefaults(); } } }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - // 日志记录中断信息 System.err.println("Interrupted while waiting to execute emergency task."); } } @@ -77,7 +77,6 @@ public class OptimizedPrioritizedThreadPool { }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - // 日志记录中断信息 System.err.println("Interrupted while waiting to execute high priority task."); } } @@ -94,7 +93,6 @@ public class OptimizedPrioritizedThreadPool { }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - // 日志记录中断信息 System.err.println("Interrupted while waiting to execute medium priority task."); } } @@ -111,13 +109,11 @@ public class OptimizedPrioritizedThreadPool { }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - // 日志记录中断信息 System.err.println("Interrupted while waiting to execute low priority task."); } } public static synchronized void adjustForEmergency() { - // 仅在未处于紧急模式下调整线程分配 if (!inEmergencyMode) { inEmergencyMode = true; @@ -131,8 +127,7 @@ public class OptimizedPrioritizedThreadPool { } } - public static synchronized void restoreDefaults() { - // 检查是否处于紧急模式 + public static synchronized void restoreDefaults() { if (inEmergencyMode) { // 清空紧急任务的许可 emergencySemaphore.drainPermits(); @@ -167,5 +162,4 @@ public class OptimizedPrioritizedThreadPool { } } } - } diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/thread/PrioritizedThreadPool.java b/cloud-task-server/src/main/java/com/muyu/task/server/thread/PrioritizedThreadPool.java deleted file mode 100644 index 7eb5eef..0000000 --- a/cloud-task-server/src/main/java/com/muyu/task/server/thread/PrioritizedThreadPool.java +++ /dev/null @@ -1,142 +0,0 @@ -package com.muyu.task.server.thread; - -import java.util.concurrent.*; - -public class PrioritizedThreadPool { - private static final ExecutorService executor; - private static final Semaphore highSemaphore; - private static final Semaphore lowSemaphore; - private static final Semaphore centerSemaphore; - private static final Semaphore emergencySemaphore; - - private static final int totalThreads = 30; - private static final int highThreads = 16; - private static final int centerThreads = 9; - private static final int lowThreads = 5; - - private static final int emergencyTaskThreads = 14; - private static final int emergencyHighThreads = 9; - private static final int emergencyCenterThreads = 5; - private static final int emergencyLowThreads = 2; - - - private static volatile boolean inEmergencyMode = false; - - static { - if (highThreads + centerThreads + lowThreads > totalThreads) { - throw new IllegalArgumentException("总和大于线程数"); - } - if (emergencyCenterThreads + emergencyHighThreads + emergencyLowThreads > totalThreads) { - throw new IllegalArgumentException("总和大于线程数"); - } - executor =new ThreadPoolExecutor(totalThreads, totalThreads,60L,TimeUnit.SECONDS,new LinkedBlockingQueue()); - highSemaphore = new Semaphore(highThreads); - centerSemaphore = new Semaphore(centerThreads); - lowSemaphore = new Semaphore(lowThreads); - emergencySemaphore = new Semaphore(0); - - } - public static void submit(Runnable task) { - adjustForEmergency(); - try { - emergencySemaphore.acquire(); - executor.submit(()->{ - try { - task.run(); - }finally { - emergencySemaphore.release(); - restoreDefaults(); - } - }); - }catch (InterruptedException e){ - Thread.currentThread().interrupt(); - } - } - private static void subHighTask(Runnable task){ - try { - highSemaphore.acquire(); - executor.submit(()->{ - try { - task.run(); - }finally { - highSemaphore.release(); - } - }); - }catch (InterruptedException e){ - Thread.currentThread().interrupt(); - } - } - private static void subcenterTask(Runnable task){ - try { - centerSemaphore.acquire(); - executor.submit(()->{ - try { - task.run(); - }finally { - centerSemaphore.release(); - } - }); - }catch (InterruptedException e){ - Thread.currentThread().interrupt(); - } - } - private static void subLowTask(Runnable task){ - try { - lowSemaphore.acquire(); - executor.submit(()->{ - try { - task.run(); - }finally { - lowSemaphore.release(); - } - }); - }catch (InterruptedException e){ - Thread.currentThread().interrupt(); - } - } - - private static synchronized void adjustForEmergency(){ - if (!inEmergencyMode) { - inEmergencyMode = true; - - highSemaphore.release(highThreads-emergencyHighThreads); - centerSemaphore.release(centerThreads-emergencyCenterThreads); - lowSemaphore.release(lowThreads-emergencyLowThreads); - emergencySemaphore.release(emergencyTaskThreads); - } - } - - private static synchronized void restoreDefaults(){ - if (inEmergencyMode){ - emergencySemaphore.drainPermits(); - try { - highSemaphore.acquire(emergencyHighThreads); - centerSemaphore.acquire(emergencyCenterThreads); - lowSemaphore.acquire(emergencyLowThreads); - }catch (InterruptedException e){ - throw new RuntimeException(e); - } - } - highSemaphore.release(highThreads); - centerSemaphore.release(centerThreads); - lowSemaphore.release(lowThreads); - - emergencySemaphore.release(-emergencyTaskThreads); - - inEmergencyMode = false; - } - - public static void shutdown() throws InterruptedException { - executor.shutdown(); - if (!executor.awaitTermination(1,TimeUnit.HOURS)){ - executor.shutdownNow(); - if (!executor.awaitTermination(1,TimeUnit.MINUTES)){ - System.err.println("Executor did not terminate"); - } - } - } - - - - -}