diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index de92500..5bf3287 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -43,7 +43,7 @@ import static com.muyu.task.server.thread.OptimizedPrioritizedThreadPool.*; @Service public class TaskInfoServiceImpl extends ServiceImpl implements TaskInfoService { - private final Integer PAGE_SIZE = 30000; + private final Integer PAGE_SIZE = 100000; @Resource private TaskInfoMapper taskInfoMapper; @@ -223,46 +223,66 @@ public class TaskInfoServiceImpl extends ServiceImpl i private void extracted(Long data, Long taskId, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) { long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0); + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * PAGE_SIZE; long pageSize = Math.min(PAGE_SIZE, data - pageNum); - if (Weight.high.getValue().equals(weight)) { - log.info("执行高级任务"); - submitHighPriorityTask(() -> { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); - }); - - } - if (Weight.centre.getValue().equals(weight)) { - submitMediumPriorityTask(() -> { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); - }); - } - if (Weight.low.getValue().equals(weight)) { - - submitLowPriorityTask(() -> { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); - }); - - } - if (Weight.urgency.getValue().equals(weight)) { - log.info("执行紧急任务"); - // 调整线程分配以适应紧急任务 - OptimizedPrioritizedThreadPool.activeEmergencyTasks.set(0); - OptimizedPrioritizedThreadPool.remainingTasks.set(0); - submitEmergencyTask(() -> { - try { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); - } finally { - // 减少剩余任务计数 - if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) { - System.out.println("All emergency tasks have completed."); - } - } - }); - + executorService.submit(() -> { + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); + }); +// if (Weight.high.getValue().equals(weight)) { +// log.info("执行高级任务"); +// submitHighPriorityTask(() -> { +// getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); +// }); +// +// } +// if (Weight.centre.getValue().equals(weight)) { +// submitMediumPriorityTask(() -> { +// getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); +// }); +// } +// if (Weight.low.getValue().equals(weight)) { +// +// submitLowPriorityTask(() -> { +// getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); +// }); +// +// } +// if (Weight.urgency.getValue().equals(weight)) { +// log.info("执行紧急任务"); +// // 调整线程分配以适应紧急任务 +// OptimizedPrioritizedThreadPool.activeEmergencyTasks.set(0); +// OptimizedPrioritizedThreadPool.remainingTasks.set(0); +// submitEmergencyTask(() -> { +// try { +// getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); +// } finally { +// // 减少剩余任务计数 +// if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) { +// System.out.println("All emergency tasks have completed."); +// } +// } +// }); +// +// +// } + } + executorService.shutdown(); + // 等待所有任务完成(可选) + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + // 如果在指定时间内没有完成,可以选择取消当前正在执行的任务 + executorService.shutdownNow(); + // 然后根据需要处理超时情况 } + } catch (InterruptedException e) { + // 当前线程在等待过程中被中断 + executorService.shutdownNow(); + // 当前线程需要处理中断情况 + Thread.currentThread().interrupt(); } }