紧急任务线程修改

master
Cui YongXing 2024-09-05 20:55:05 +08:00
parent aea5c583b2
commit de82ad0c00
3 changed files with 11 additions and 162 deletions

View File

@ -167,7 +167,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
HashSet<Long> longs = new HashSet<>();
List<TaskInput> list = taskInputService.list(wrapper);
if (CollectionUtils.isEmpty(list)) {
return "没有选择字段";
return "没有选择";
}
HashMap<String, String> tableNameMap = new HashMap<>();
String fieName = "";
@ -285,11 +285,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map,num);
});
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long end = System.currentTimeMillis();
@ -302,6 +298,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
private void getString(Long pageNum,String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap<String, String> map ,Integer num) {
String sqlSelect = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +","+PAGE_SIZE;
log.info(sqlSelect);
Result<DataValue[][]> tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect);
DataValue[][] data = tableValueResult.getData();
for (DataValue[] datum : data) {

View File

@ -2,8 +2,8 @@ package com.muyu.task.server.thread;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class OptimizedPrioritizedThreadPool {
private static final ExecutorService executor;
private static final Semaphore highPrioritySemaphore;
private static final Semaphore mediumPrioritySemaphore;
@ -22,6 +22,7 @@ public class OptimizedPrioritizedThreadPool {
private static volatile boolean inEmergencyMode = false;
private static final AtomicInteger activeEmergencyTasks = new AtomicInteger(0); // 紧急任务计数器
static {
if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) {
throw new IllegalArgumentException("Default priority threads exceed total threads.");
@ -34,7 +35,7 @@ public class OptimizedPrioritizedThreadPool {
executor = new ThreadPoolExecutor(
totalThreads, totalThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>() // 使用无界的阻塞队列
new LinkedBlockingQueue<Runnable>()
);
highPrioritySemaphore = new Semaphore(defaultHighThreads);
mediumPrioritySemaphore = new Semaphore(defaultMediumThreads);
@ -60,7 +61,6 @@ public class OptimizedPrioritizedThreadPool {
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute emergency task.");
}
}
@ -77,7 +77,6 @@ public class OptimizedPrioritizedThreadPool {
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute high priority task.");
}
}
@ -94,7 +93,6 @@ public class OptimizedPrioritizedThreadPool {
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute medium priority task.");
}
}
@ -111,13 +109,11 @@ public class OptimizedPrioritizedThreadPool {
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute low priority task.");
}
}
public static synchronized void adjustForEmergency() {
// 仅在未处于紧急模式下调整线程分配
if (!inEmergencyMode) {
inEmergencyMode = true;
@ -132,7 +128,6 @@ public class OptimizedPrioritizedThreadPool {
}
public static synchronized void restoreDefaults() {
// 检查是否处于紧急模式
if (inEmergencyMode) {
// 清空紧急任务的许可
emergencySemaphore.drainPermits();
@ -167,5 +162,4 @@ public class OptimizedPrioritizedThreadPool {
}
}
}
}

View File

@ -1,142 +0,0 @@
package com.muyu.task.server.thread;
import java.util.concurrent.*;
public class PrioritizedThreadPool {
private static final ExecutorService executor;
private static final Semaphore highSemaphore;
private static final Semaphore lowSemaphore;
private static final Semaphore centerSemaphore;
private static final Semaphore emergencySemaphore;
private static final int totalThreads = 30;
private static final int highThreads = 16;
private static final int centerThreads = 9;
private static final int lowThreads = 5;
private static final int emergencyTaskThreads = 14;
private static final int emergencyHighThreads = 9;
private static final int emergencyCenterThreads = 5;
private static final int emergencyLowThreads = 2;
private static volatile boolean inEmergencyMode = false;
static {
if (highThreads + centerThreads + lowThreads > totalThreads) {
throw new IllegalArgumentException("总和大于线程数");
}
if (emergencyCenterThreads + emergencyHighThreads + emergencyLowThreads > totalThreads) {
throw new IllegalArgumentException("总和大于线程数");
}
executor =new ThreadPoolExecutor(totalThreads, totalThreads,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
highSemaphore = new Semaphore(highThreads);
centerSemaphore = new Semaphore(centerThreads);
lowSemaphore = new Semaphore(lowThreads);
emergencySemaphore = new Semaphore(0);
}
public static void submit(Runnable task) {
adjustForEmergency();
try {
emergencySemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
emergencySemaphore.release();
restoreDefaults();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static void subHighTask(Runnable task){
try {
highSemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
highSemaphore.release();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static void subcenterTask(Runnable task){
try {
centerSemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
centerSemaphore.release();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static void subLowTask(Runnable task){
try {
lowSemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
lowSemaphore.release();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static synchronized void adjustForEmergency(){
if (!inEmergencyMode) {
inEmergencyMode = true;
highSemaphore.release(highThreads-emergencyHighThreads);
centerSemaphore.release(centerThreads-emergencyCenterThreads);
lowSemaphore.release(lowThreads-emergencyLowThreads);
emergencySemaphore.release(emergencyTaskThreads);
}
}
private static synchronized void restoreDefaults(){
if (inEmergencyMode){
emergencySemaphore.drainPermits();
try {
highSemaphore.acquire(emergencyHighThreads);
centerSemaphore.acquire(emergencyCenterThreads);
lowSemaphore.acquire(emergencyLowThreads);
}catch (InterruptedException e){
throw new RuntimeException(e);
}
}
highSemaphore.release(highThreads);
centerSemaphore.release(centerThreads);
lowSemaphore.release(lowThreads);
emergencySemaphore.release(-emergencyTaskThreads);
inEmergencyMode = false;
}
public static void shutdown() throws InterruptedException {
executor.shutdown();
if (!executor.awaitTermination(1,TimeUnit.HOURS)){
executor.shutdownNow();
if (!executor.awaitTermination(1,TimeUnit.MINUTES)){
System.err.println("Executor did not terminate");
}
}
}
}