From 5c50d194bee379e4d35eaf28dd484e30d4cde08b Mon Sep 17 00:00:00 2001 From: Cui YongXing <2835316714@qq.com> Date: Fri, 6 Sep 2024 17:02:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=BB=E6=89=BE=E6=8A=A5=E9=94=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/TaskInfoServiceImpl.java | 242 +++++++++--------- 1 file changed, 116 insertions(+), 126 deletions(-) diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index a86b16f..bdd7e70 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -28,9 +28,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; import static com.muyu.task.server.thread.OptimizedPrioritizedThreadPool.*; @@ -165,13 +163,106 @@ public class TaskInfoServiceImpl extends ServiceImpl i String weight = taskInfo.getWeight(); QueryWrapper wrapper = new QueryWrapper<>(); wrapper.eq("task_id", taskId); - HashSet longs = new HashSet<>(); List list = taskInputService.list(wrapper); if (CollectionUtils.isEmpty(list)) { return "没有选择表"; } - HashMap tableNameMap = new HashMap<>(); String fieName = ""; + + Map fie = getFieAsNameAndFieEngineId(list); + Map fieAsName = (Map) fie.get("FieAsName"); + Map fieEngineId = (Map) fie.get("FieEngineId"); + Set longs = (Set) fie.get("longs"); + if (longs.size() > 1) { + return "你选择的不是同一个数据库"; + } + + String joint = getJoint(taskId); + Long basicId = taskInputService.selectByBasicId(taskId); + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("task_id", taskId); + TaskOutput one = taskOutputService.getOne(queryWrapper); + if (one == null) { + return "没有选择输出"; + } + Long tableId = one.getTableId(); + Long newBasicId = one.getBasicId(); + HashMap map = new HashMap<>(); + String[] newFieName = one.getNewFieName().split(","); + String[] lastFieName = one.getLastFieName().split(","); + Integer num = lastFieName.length; + for (int i = 0; i < newFieName.length; i++) { + map.put(lastFieName[i], newFieName[i]); + fieName += "," + fieAsName.get(lastFieName[i]); + } + fieName = fieName.substring(1); + //查询表的数量 + Long count = getCount(joint, basicId); + //查询和添加 + extracted(count, weight,fieName,joint, basicId, newBasicId, tableId, map, num); + long end = System.currentTimeMillis(); + //log.info("执行时间:{}",end-start); + return null; + } + + private Long getCount(String joint, Long basicId) { + String sql = " SELECT count(1) FROM " + joint; + Result countResult = datasourceFeign.findCount(basicId, sql); + Long data = countResult.getData(); + return data; + } + + private void extracted(Long data, String weight,String finalFieName,String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) { + long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0); + for (long i = 1; i <= count; i++) { + long pageNum = (i - 1) * PAGE_SIZE; + long pageSize = Math.min(PAGE_SIZE, data - pageNum); + if (Weight.high.getValue().equals(weight)) { + log.info("执行高级任务"); + submitHighPriorityTask(() -> { + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + }); + + } + if (Weight.centre.getValue().equals(weight)) { + submitMediumPriorityTask(() -> { + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + }); + } + if (Weight.low.getValue().equals(weight)) { + + submitLowPriorityTask(() -> { + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + }); + + } + if (Weight.urgency.getValue().equals(weight)) { + log.info("执行紧急任务"); + // 调整线程分配以适应紧急任务 + OptimizedPrioritizedThreadPool.activeEmergencyTasks.set(0); + OptimizedPrioritizedThreadPool.remainingTasks.set(0); + submitEmergencyTask(() -> { + try { + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + } finally { + // 减少剩余任务计数 + if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) { + System.out.println("All emergency tasks have completed."); + } + } + }); + + + } + } + + } + + + @NotNull + private Map getFieAsNameAndFieEngineId(List list) { + HashMap tableNameMap = new HashMap<>(); + HashSet longs = new HashSet<>(); HashMap hashMap = new HashMap<>(); for (int i = 0; i < list.size(); i++) { Long databaseId = list.get(i).getDatabaseId(); @@ -183,16 +274,21 @@ public class TaskInfoServiceImpl extends ServiceImpl i hashMap.put(tableFieId[j], fieIdAsEngineId[j]); tableNameMap.put(tableFieId[j], tableAsFieId[j]); } - - } - if (longs.size() > 1) { - return "你选择的不是同一个数据库"; } + HashMap map = new HashMap<>(); + map.put("FieAsName", tableNameMap); + map.put("FieEngineId", hashMap); + map.put("longs", longs); + return map; + } + + @NotNull + private String getJoint(Long taskId) { + String joint = ""; QueryWrapper jointQueryWrapper = new QueryWrapper<>(); jointQueryWrapper.eq("task_id", taskId); List jointList = nodeJointService.list(jointQueryWrapper); - String joint = ""; if (!CollectionUtils.isEmpty(jointList)) { for (NodeJoint nodeJoint : jointList) { String oneNodeId = nodeJoint.getOneNodeId(); @@ -214,113 +310,14 @@ public class TaskInfoServiceImpl extends ServiceImpl i } } else { + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("task_id", taskId); TaskInput taskInput = taskInputService.getOne(wrapper); String tableName = taskInput.getTableName(); String tableAsName = taskInput.getTableAsName(); joint = " " + tableName + " " + tableAsName; } - Long basicId = taskInputService.selectByBasicId(taskId); - - - QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.eq("task_id", taskId); - TaskOutput one = taskOutputService.getOne(queryWrapper); - if (one==null){ - return "没有选择输出"; - } - Long tableId = one.getTableId(); - Long newBasicId = one.getBasicId(); - HashMap map = new HashMap<>(); - - String[] newFieName = one.getNewFieName().split(","); - String[] lastFieName = one.getLastFieName().split(","); - Integer num = lastFieName.length; - for (int i = 0; i < newFieName.length; i++) { - map.put(lastFieName[i], newFieName[i]); - fieName += "," + tableNameMap.get(lastFieName[i]); - } - - fieName = fieName.substring(1); - String sql = " SELECT count(1) FROM " + joint; - Result countResult = datasourceFeign.findCount(basicId, sql); - Long data = countResult.getData(); - String finalFieName = fieName; - String finalJoint = joint; - long count = data/PAGE_SIZE==0?1:data/PAGE_SIZE+1; - long pageSize; - if (Weight.high.getValue().equals(weight)){ - log.info("执行高级任务"); - for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * PAGE_SIZE; - pageSize = data - pageNum; - if (pageSize>=PAGE_SIZE){ - pageSize=PAGE_SIZE; - } - long finalPageSize = pageSize; - submitHighPriorityTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map, finalPageSize,num); - }); - - } - } - if (Weight.centre.getValue().equals(weight)){ - log.info("执行中级任务"); - for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * PAGE_SIZE; - pageSize = data - pageNum; - if (pageSize>=PAGE_SIZE){ - pageSize=PAGE_SIZE; - } - long finalPageSize1 = pageSize; - submitMediumPriorityTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map, finalPageSize1,num); - }); - } - } - if (Weight.low.getValue().equals(weight)){ - log.info("执行低级任务"); - for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * PAGE_SIZE; - pageSize = data - pageNum; - if (pageSize>=PAGE_SIZE){ - pageSize=PAGE_SIZE; - } - long finalPageSize2 = pageSize; - submitLowPriorityTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map, finalPageSize2,num); - }); - } - } - if (Weight.urgency.getValue().equals(weight)){ - log.info("执行紧急任务"); - // 调整线程分配以适应紧急任务 - OptimizedPrioritizedThreadPool.activeEmergencyTasks.set(0); - OptimizedPrioritizedThreadPool.remainingTasks.set(0); - for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * PAGE_SIZE; - pageSize = data - pageNum; - if (pageSize>=PAGE_SIZE){ - pageSize=PAGE_SIZE; - } - long finalPageSize3 = pageSize; - System.out.println(finalPageSize3); - submitEmergencyTask(()->{ - try { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, finalPageSize3, num); - } finally { - // 减少剩余任务计数 - if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) { - System.out.println("All emergency tasks have completed."); - } - } - }); - } - - - } - long end = System.currentTimeMillis(); - //log.info("执行时间:{}",end-start); - return null; + return joint; } @@ -331,24 +328,17 @@ public class TaskInfoServiceImpl extends ServiceImpl i Long basicId, Long newBasicId, Long tableId, - HashMap map , + HashMap map, Long one, Integer two) { - String sqlSelect = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +","+PAGE_SIZE; + String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT "+ PAGE_SIZE +" OFFSET" + pageNum ; log.info(sqlSelect); //log.info("执行{}查询的方法",sqlSelect); - Result tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect,one,two); + Result tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect, one, two); log.info(tableValueResult); DataValue[][] data = tableValueResult.getData(); - log.info("执行{}查询的方法结束",sqlSelect); -// if (pageNum==160000){ -// for (DataValue[] datum : data) { -// for (DataValue dataValue : datum) { -// System.out.println(dataValue); -// } -// } -// } + log.info("执行{}查询的方法结束", sqlSelect); for (DataValue[] datum : data) { for (DataValue dataValue : datum) { String key = dataValue.getKey(); @@ -356,10 +346,10 @@ public class TaskInfoServiceImpl extends ServiceImpl i dataValue.setKey(newKey); } } - log.info("{}查询结束",sqlSelect); - log.info("执行{}添加的方法",sqlSelect); + log.info("{}查询结束", sqlSelect); + log.info("执行{}添加的方法", sqlSelect); Result result = datasourceFeign.addProduct(newBasicId, tableId, data); - log.info("{}添加结束",result); + log.info("{}添加结束", result); // for (List dataValues : tableValue) { // for (DataValue dataValue : dataValues) { // String key = dataValue.getKey();