From 312663aff2b43f009e9128b04ed52d43206df105 Mon Sep 17 00:00:00 2001 From: Cui YongXing <2835316714@qq.com> Date: Tue, 3 Sep 2024 22:20:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/muyu/common/domian/enums/Weight.java | 16 ++ .../muyu/remote/feign/DatasourceFeign.java | 2 + .../feign/Factory/DatasourceFeignFactory.java | 6 + .../service/impl/TaskInfoServiceImpl.java | 40 ++++- .../OptimizedPrioritizedThreadPool.java | 169 ++++++++++++++++++ 5 files changed, 228 insertions(+), 5 deletions(-) create mode 100644 cloud-task-common/src/main/java/com/muyu/common/domian/enums/Weight.java create mode 100644 cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java diff --git a/cloud-task-common/src/main/java/com/muyu/common/domian/enums/Weight.java b/cloud-task-common/src/main/java/com/muyu/common/domian/enums/Weight.java new file mode 100644 index 0000000..76c2eec --- /dev/null +++ b/cloud-task-common/src/main/java/com/muyu/common/domian/enums/Weight.java @@ -0,0 +1,16 @@ +package com.muyu.common.domian.enums; + +public enum Weight{ + urgency("3"),high("0"),centre("1"),low("2"); + private String value; + Weight(String value){ + this.value = value; + } + + public String getValue(){ + return value; + } + + + +} diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java index 7656357..53f45fc 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java @@ -25,4 +25,6 @@ public interface DatasourceFeign { @PostMapping("/product/addProduct") public Result addProduct(@RequestParam("basicId") Long basicId, @RequestParam("tableId") Long tableId, @RequestBody List> listList); + @PostMapping("/dataValue/findCount") + public Result findCount(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql); } diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java index 38783df..a869331 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java @@ -26,6 +26,12 @@ public class DatasourceFeignFactory implements FallbackFactory log.info(e); return Result.error("网络开小差......"); } + + @Override + public Result findCount(Long basicId, String sql) { + log.info(e); + return Result.error("网络开小差......"); + } }; } } diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index 93e875b..b2e6895 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -11,6 +11,7 @@ import com.muyu.common.domian.NodeJoint; import com.muyu.common.domian.TaskInfo; import com.muyu.common.domian.TaskInput; import com.muyu.common.domian.TaskOutput; +import com.muyu.common.domian.enums.Weight; import com.muyu.common.domian.req.TaskInfoListReq; import com.muyu.common.domian.resp.TaskInfoResp; import com.muyu.remote.feign.DatasourceFeign; @@ -20,6 +21,7 @@ import com.muyu.task.server.service.TaskInfoService; import com.muyu.task.server.service.TaskInputService; import com.muyu.task.server.service.TaskOutputService; import lombok.extern.log4j.Log4j2; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -30,6 +32,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import static com.muyu.task.server.thread.OptimizedPrioritizedThreadPool.submitHighPriorityTask; + /** * @author Administrator */ @@ -153,6 +157,8 @@ public class TaskInfoServiceImpl extends ServiceImpl i @Override public String findByFieName2(Long taskId) { + TaskInfo taskInfo = taskInfoMapper.selectById(taskId); + String weight = taskInfo.getWeight(); QueryWrapper wrapper = new QueryWrapper<>(); wrapper.eq("task_id", taskId); HashSet longs = new HashSet<>(); @@ -170,8 +176,8 @@ public class TaskInfoServiceImpl extends ServiceImpl i String[] tableAsFieId = list.get(i).getTableAsFieId().split(","); String[] fieIdAsEngineId = list.get(i).getFieIdAsEngineId().split(","); for (int j = 0; j < tableFieId.length; j++) { - hashMap.put(tableFieId[j],fieIdAsEngineId[j]); - tableNameMap.put(tableFieId[j],tableAsFieId[j]); + hashMap.put(tableFieId[j], fieIdAsEngineId[j]); + tableNameMap.put(tableFieId[j], tableAsFieId[j]); } } @@ -203,11 +209,11 @@ public class TaskInfoServiceImpl extends ServiceImpl i + " = " + nodeJoint.getTwoFie(); } - }else { + } else { TaskInput taskInput = taskInputService.getOne(wrapper); String tableName = taskInput.getTableName(); String tableAsName = taskInput.getTableAsName(); - joint=" "+ tableName+" "+tableAsName; + joint = " " + tableName + " " + tableAsName; } Long basicId = taskInputService.selectByBasicId(taskId); @@ -225,7 +231,31 @@ public class TaskInfoServiceImpl extends ServiceImpl i fieName += "," + tableNameMap.get(lastFieName[i]); } fieName = fieName.substring(1); - String sql = " SELECT " + fieName + " FROM "+ joint; + String sql = " SELECT count(1) FROM " + joint; + Result count = datasourceFeign.findCount(basicId, sql); + System.out.println("======="+count+"======="); + if (Weight.high.equals(weight)){ + submitHighPriorityTask(()->{ + + }); + } + if (Weight.centre.equals(weight)){ + + } + if (Weight.low.equals(weight)){ + + } + if (Weight.urgency.equals(weight)){ + + } + //return getString(fieName,joint,basicId,newBasicId,tableId,map); + return null; + } + + @NotNull + private String getString(String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap map ) { + + String sql = " SELECT " + fieName + " FROM " + joint; System.out.println(sql); Result>> tableValueResult = datasourceFeign.findTableValue(basicId, sql); List> tableValue = tableValueResult.getData(); diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java new file mode 100644 index 0000000..c9a6787 --- /dev/null +++ b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java @@ -0,0 +1,169 @@ +package com.muyu.task.server.thread; + +import java.util.concurrent.*; + +public class OptimizedPrioritizedThreadPool { + private static final ExecutorService executor; + private static final Semaphore highPrioritySemaphore; + private static final Semaphore mediumPrioritySemaphore; + private static final Semaphore lowPrioritySemaphore; + private static final Semaphore emergencySemaphore; + + private static final int totalThreads = 30; // 固定线程池大小 + private static final int defaultHighThreads = 16; + private static final int defaultMediumThreads = 9; + private static final int defaultLowThreads = 5; + + private static final int emergencyThreads = 14; + private static final int emergencyHighThreads = 9; + private static final int emergencyMediumThreads = 5; + private static final int emergencyLowThreads = 2; + + private static volatile boolean inEmergencyMode = false; + + static { + if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) { + throw new IllegalArgumentException("Default priority threads exceed total threads."); + } + if (emergencyThreads + emergencyHighThreads + emergencyMediumThreads + emergencyLowThreads > totalThreads) { + throw new IllegalArgumentException("Emergency priority threads exceed total threads."); + } + + // 创建固定大小的线程池 + executor = new ThreadPoolExecutor( + totalThreads, totalThreads, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue() // 使用无界的阻塞队列 + ); + highPrioritySemaphore = new Semaphore(defaultHighThreads); + mediumPrioritySemaphore = new Semaphore(defaultMediumThreads); + lowPrioritySemaphore = new Semaphore(defaultLowThreads); + emergencySemaphore = new Semaphore(0); + } + + public static void submitEmergencyTask(Runnable task) { + // 调整线程分配以适应紧急任务 + adjustForEmergency(); + + try { + emergencySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + emergencySemaphore.release(); + // 任务完成后恢复默认线程分配 + restoreDefaults(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // 日志记录中断信息 + System.err.println("Interrupted while waiting to execute emergency task."); + } + } + + public static void submitHighPriorityTask(Runnable task) { + try { + highPrioritySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + highPrioritySemaphore.release(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // 日志记录中断信息 + System.err.println("Interrupted while waiting to execute high priority task."); + } + } + + public static void submitMediumPriorityTask(Runnable task) { + try { + mediumPrioritySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + mediumPrioritySemaphore.release(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // 日志记录中断信息 + System.err.println("Interrupted while waiting to execute medium priority task."); + } + } + + public static void submitLowPriorityTask(Runnable task) { + try { + lowPrioritySemaphore.acquire(); + executor.submit(() -> { + try { + task.run(); + } finally { + lowPrioritySemaphore.release(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // 日志记录中断信息 + System.err.println("Interrupted while waiting to execute low priority task."); + } + } + + private static synchronized void adjustForEmergency() { + // 仅在未处于紧急模式下调整线程分配 + if (!inEmergencyMode) { + inEmergencyMode = true; + + // 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间 + highPrioritySemaphore.release(defaultHighThreads - emergencyHighThreads); + mediumPrioritySemaphore.release(defaultMediumThreads - emergencyMediumThreads); + lowPrioritySemaphore.release(defaultLowThreads - emergencyLowThreads); + + // 为紧急任务分配线程 + emergencySemaphore.release(emergencyThreads); + } + } + + private static synchronized void restoreDefaults() { + // 检查是否处于紧急模式 + if (inEmergencyMode) { + // 清空紧急任务的许可 + emergencySemaphore.drainPermits(); + + // 重新分配高、中、低优先级的线程 + try { + highPrioritySemaphore.acquire(emergencyHighThreads); + mediumPrioritySemaphore.acquire(emergencyMediumThreads); + lowPrioritySemaphore.acquire(emergencyLowThreads); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + highPrioritySemaphore.release(defaultHighThreads); + mediumPrioritySemaphore.release(defaultMediumThreads); + lowPrioritySemaphore.release(defaultLowThreads); + + // 清空紧急任务的许可 + emergencySemaphore.release(-emergencyThreads); + + // 退出紧急模式 + inEmergencyMode = false; + } + } + + public static void shutdown() throws InterruptedException { + executor.shutdown(); + if (!executor.awaitTermination(1, TimeUnit.HOURS)) { + executor.shutdownNow(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + System.err.println("Pool did not terminate"); + } + } + } + +}