diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index 819c81c..54898c1 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -29,7 +29,7 @@ import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static com.muyu.task.server.thread.OptimizedPrioritizedThreadPool.*; @@ -202,7 +202,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i //查询表的数量 Long count = getCount(joint, basicId); //查询和添加 - extracted(count, weight,fieName,joint, basicId, newBasicId, tableId, map, num); + extracted(count, weight, fieName, joint, basicId, newBasicId, tableId, map, num); long end = System.currentTimeMillis(); //log.info("执行时间:{}",end-start); return null; @@ -216,7 +216,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i return data; } - private void extracted(Long data, String weight,String finalFieName,String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) { + private void extracted(Long data, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) { long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0); for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * PAGE_SIZE; @@ -335,35 +335,78 @@ public class TaskInfoServiceImpl extends ServiceImpl i HashMap map, Long one, Integer two) { - String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT "+ PAGE_SIZE +" OFFSET " + pageNum ; + String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT " + PAGE_SIZE + " OFFSET " + pageNum; log.info(sqlSelect); //log.info("执行{}查询的方法",sqlSelect); Result tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect, one, two); log.info(tableValueResult); + DataValue[][] data = tableValueResult.getData(); log.info("执行{}查询的方法结束", sqlSelect); - for (DataValue[] datum : data) { - for (DataValue dataValue : datum) { - String key = dataValue.getKey(); - String newKey = map.get(key); - dataValue.setKey(newKey); - } - } - log.info("{}查询结束", sqlSelect); - log.info("执行{}添加的方法", sqlSelect); - Result result = datasourceFeign.addProduct(newBasicId, tableId, data); - log.info("{}添加结束", result); -// for (List dataValues : tableValue) { -// for (DataValue dataValue : dataValues) { -// String key = dataValue.getKey(); -// String newKey = map.get(key); -// dataValue.setKey(newKey); -// } -// } + executeTheRule(data,map,newBasicId,tableId); } + private void executeTheRule(DataValue[][] dataValues,HashMap map, Long newBasicId, + Long tableId) { + // 创建一个单线程的ExecutorService + ExecutorService executor = Executors.newSingleThreadExecutor(); + + // 创建一个链表来保存任务 + LinkedList> tasks = new LinkedList<>(); + + // 初始化第一个任务 + tasks.add(() -> { + return dataValues; + }); + + // 创建任务链 + Future currentFuture = null; + for (int i = 1; i <= 4; i++) { + final Future finalCurrentFuture = currentFuture; + Callable task = () -> { + + DataValue[][] prevResult = finalCurrentFuture.get(); + + return prevResult; + }; + + // 提交任务并更新当前Future + currentFuture = executor.submit(task); + + // 等待当前任务完成 + try { + currentFuture.get(); + System.out.println("Task " + i + " completed with result:"); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + System.out.println("Task execution failed: " + e.getMessage()); + break; + } + } + try { + DataValue[][] afterFilteringDataValue = currentFuture.get(); + for (DataValue[] datum : afterFilteringDataValue) { + for (DataValue dataValue : datum) { + String key = dataValue.getKey(); + String newKey = map.get(key); + dataValue.setKey(newKey); + } + } + Result result = datasourceFeign.addProduct(newBasicId, tableId, afterFilteringDataValue); + log.info("{}添加结束", result); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + + // 关闭ExecutorService + executor.shutdown(); + } + }