diff --git a/cloud-task-common/pom.xml b/cloud-task-common/pom.xml index 7f73f9d..62d6662 100644 --- a/cloud-task-common/pom.xml +++ b/cloud-task-common/pom.xml @@ -30,6 +30,12 @@ 1.0.0 + + + + + + diff --git a/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java b/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java index e582504..59ce880 100644 --- a/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java +++ b/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java @@ -1,7 +1,15 @@ -package com.muyu.task.feign; - -import org.springframework.cloud.openfeign.FeignClient; - -@FeignClient(name = "cloud-engine" ) -public interface RuleFeign { -} +//package com.muyu.task.feign; +// +//import com.muyu.common.core.domain.Result; +//import com.muyu.domain.EngineVersion; +//import com.muyu.task.feign.factory.RuleFeignFactory; +//import org.springframework.cloud.openfeign.FeignClient; +//import org.springframework.web.bind.annotation.GetMapping; +//import org.springframework.web.bind.annotation.PathVariable; +// +//@FeignClient(name = "cloud-engine",fallbackFactory = RuleFeignFactory.class) +//public interface RuleFeign { +// +// @GetMapping("/version/getVersion/{id}") +// public Result getVersion(@PathVariable("id") Long id); +//} diff --git a/cloud-task-remote/src/main/java/com/muyu/task/feign/factory/RuleFeignFactory.java b/cloud-task-remote/src/main/java/com/muyu/task/feign/factory/RuleFeignFactory.java new file mode 100644 index 0000000..9c03f6e --- /dev/null +++ b/cloud-task-remote/src/main/java/com/muyu/task/feign/factory/RuleFeignFactory.java @@ -0,0 +1,25 @@ +//package com.muyu.task.feign.factory; +// +////import com.muyu.task.feign.RuleFeign; +//import lombok.extern.log4j.Log4j2; +//import org.springframework.cloud.openfeign.FallbackFactory; +//import org.springframework.stereotype.Component; +// +///** +// * @PackageName:com.muyu.task.feign.factory +// * @ClassName:RuleFeignFactory +// * @Description: +// * @author: ¥陈思豪¥ +// * @date: 2024/9/9 17:35 +// */ +// +//@Log4j2 +//@Component +//public class RuleFeignFactory implements FallbackFactory { +// +// +// @Override +// public RuleFeign create(Throwable cause) { +// return null; +// } +//} diff --git a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java index 9e685bd..c4ec333 100644 --- a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java @@ -10,15 +10,22 @@ import com.muyu.mapper.TaskMapper; import com.muyu.service.*; import com.muyu.task.PriorityThreadPool; import com.muyu.task.feign.DataValueClient; -import com.muyu.task.feign.RuleFeign; +//import com.muyu.task.feign.RuleFeign; +import jakarta.annotation.Resource; import jakarta.validation.constraints.NotNull; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static com.muyu.task.PriorityThreadPool.*; @@ -32,7 +39,7 @@ import static com.muyu.task.PriorityThreadPool.*; */ @Service @Log4j2 -public class TaskServiceImpl extends ServiceImpl implements TaskService { +public class TaskServiceImpl extends ServiceImpl implements TaskService { @Autowired private TaskInputService taskInputService; @@ -49,6 +56,12 @@ public class TaskServiceImpl extends ServiceImpl implement @Autowired private DataValueClient dataValueClient; + @Autowired + private NodeRuleService nodeRuleService; + +// @Autowired +// private RuleFeign ruleFeign; + @Override public List selectList(TaskInfoReq taskInfoReq) { @@ -57,14 +70,14 @@ public class TaskServiceImpl extends ServiceImpl implement StringUtils.isNotEmpty(taskInfoReq.getName()), TaskInfo::getName, taskInfoReq.getName() ); - if(taskInfoReq.getStatus()!=null && taskInfoReq.getStatus()!=0){ + if (taskInfoReq.getStatus() != null && taskInfoReq.getStatus() != 0) { queryWrapper.eq( - TaskInfo::getStatus,taskInfoReq.getStatus() + TaskInfo::getStatus, taskInfoReq.getStatus() ); } - if(taskInfoReq.getWeigh()!=null && taskInfoReq.getWeigh()!=0 ){ + if (taskInfoReq.getWeigh() != null && taskInfoReq.getWeigh() != 0) { queryWrapper.eq( - TaskInfo::getWeigh,taskInfoReq.getWeigh() + TaskInfo::getWeigh, taskInfoReq.getWeigh() ); } return this.list(queryWrapper); @@ -73,7 +86,7 @@ public class TaskServiceImpl extends ServiceImpl implement @Override public String addTask(TaskInfo taskInfo) { boolean save = this.save(taskInfo); - if(save == false){ + if (save == false) { throw new RuntimeException("err"); } return "success"; @@ -90,7 +103,7 @@ public class TaskServiceImpl extends ServiceImpl implement public String deleteById(Integer id) { TaskMapper taskMapper = this.baseMapper; int i = taskMapper.deleteById(id); - if(i<=0){ + if (i <= 0) { throw new RuntimeException("err"); } return "success"; @@ -101,7 +114,7 @@ public class TaskServiceImpl extends ServiceImpl implement LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(TaskInputInfo::getTaskId, taskId); List taskInputList = taskInputService.list(queryWrapper); - if(taskInputList.isEmpty()){ + if (taskInputList.isEmpty()) { return "表节点没有进行相对应的选择,无法继续执行"; } @@ -119,7 +132,7 @@ public class TaskServiceImpl extends ServiceImpl implement for (int i = 0; i < taskInputList.size(); i++) { basicList.add(taskInputList.get(i).getDatabaseId()); databaseId = Long.parseLong(taskInputList.get(i).getDatabaseId()); - if(basicList.size()>1){ + if (basicList.size() > 1) { throw new RuntimeException("数据库选择的不同,无法执行"); } String[] tableFieldList = taskInputList.get(i).getTableField().split(","); @@ -129,9 +142,9 @@ public class TaskServiceImpl extends ServiceImpl implement fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]); for (int o = 0; o < newFileName.length; o++) { newAndOldMap.put(fieldAsNameMap.get(outPutFileName[o]), newFileName[o]); - if(tableAsFieldList[j].equals(outPutFileName[o])){ - sql += ","+taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] +" " + - tableAsFieldList[j] ; + if (tableAsFieldList[j].equals(outPutFileName[o])) { + sql += "," + taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] + " " + + tableAsFieldList[j]; } } @@ -146,7 +159,7 @@ public class TaskServiceImpl extends ServiceImpl implement LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId); List taskJoinInfos = taskJoinService.list(lambdaQueryWrapper); - if(!taskJoinInfos.isEmpty()){ + if (!taskJoinInfos.isEmpty()) { for (TaskJoinInfo taskJoinInfo : taskJoinInfos) { //拿到左表id String leftId = taskJoinInfo.getLeftId(); @@ -154,13 +167,13 @@ public class TaskServiceImpl extends ServiceImpl implement //拿到右表id String rightId = taskJoinInfo.getRightId(); TaskInputInfo rightInput = taskInputService.findByNodeId(rightId); - leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " + + leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName() + " " + taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " + - rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." + + rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." + fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." + fieldAsNameMap.get(taskJoinInfo.getRightJoinField()) + " "; } - }else{ + } else { LambdaQueryWrapper selectOne = new LambdaQueryWrapper<>(); selectOne.eq(TaskInputInfo::getTaskId, taskId); TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne); @@ -168,21 +181,19 @@ public class TaskServiceImpl extends ServiceImpl implement } - - - - - LambdaQueryWrapper taskInfoQueryWrapper = new LambdaQueryWrapper<>(); + LambdaQueryWrapper taskInfoQueryWrapper = new LambdaQueryWrapper<>(); taskInfoQueryWrapper.eq(TaskInfo::getId, taskId); TaskInfo taskInfo = taskMapper.selectById(taskId); Long basicId = Long.valueOf(taskOutputInfo.getBasicId()); Long tableId = Long.valueOf(taskOutputInfo.getTableId()); - String sqlCount = "select count(1) from "+leftJoin; + String sqlCount = "select count(1) from " + leftJoin; //查询出总条数 + long l = System.currentTimeMillis(); + log.info("开始查询!!!!!!!!!!!!!!!"); Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount); - log.info("查询到的条数为{}",count); + log.info("查询到的条数为{}", count); int pageSize = 10000; int totalSegments = (int) Math.ceil((double) count / pageSize); log.info("总共页码为{}", totalSegments); @@ -192,17 +203,17 @@ public class TaskServiceImpl extends ServiceImpl implement log.info("当前页为{}", i); int pageNum = i * pageSize; long finalFirstArray = Math.min(pageSize, count - pageNum); - String limit = " limit " + pageNum + "," + pageSize; + String limit = " limit " + pageNum + "," + pageSize; String limitSelect = sql + limit; Long finalDatabaseId = databaseId; log.info("执行查询语句为{}", limitSelect); - if(taskInfo.getWeigh() == 4){ + if (taskInfo.getWeigh() == 4) { log.info("执行紧急任务"); PriorityThreadPool.activeEmergencyTasks.set(0); PriorityThreadPool.remainingTasks.set(0); executeUrgently(() -> { try { - selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); + selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two); } finally { if (PriorityThreadPool.remainingTasks.decrementAndGet() == 0) { System.out.println("All emergency tasks have completed."); @@ -211,24 +222,24 @@ public class TaskServiceImpl extends ServiceImpl implement }); } - if(taskInfo.getWeigh() == 3){ + if (taskInfo.getWeigh() == 3) { log.info("执行高级任务"); executeHigh(() -> { - selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); + selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two); }); } - if(taskInfo.getWeigh() == 2){ + if (taskInfo.getWeigh() == 2) { log.info("执行中级任务"); executeMedium(() -> { - selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); + selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two); }); } - if(taskInfo.getWeigh() == 1){ + if (taskInfo.getWeigh() == 1) { log.info("执行低级任务"); executeLow(() -> { - selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two); + selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two); }); } } @@ -240,20 +251,25 @@ public class TaskServiceImpl extends ServiceImpl implement return "success"; } +// @Resource +// private Mysql mySqlDataSource; + @NotNull - private void selectAndAdd(Long databaseId,Long basicId,String sql,Long tableId, - HashMap newAndOldMap,Long finalFirstArray ,Integer two) { + private void selectAndAdd(Long databaseId, Long basicId, String sql, Long tableId, + HashMap newAndOldMap, Long finalFirstArray, Integer two) { log.info("开始查询数据"); log.info("sql{}", sql); log.info("数组为{},,{}", finalFirstArray, two); log.info("开始远调"); +// mySqlDataSource.setQuery(MySqlQuery.builder().dataSourceId(databaseId).sql(sql).one(finalFirstArray).two(two).build()); +// com.muyu.access.data.base.DataValue[][] rows = mySqlDataSource.getRows(); Result result = dataValueClient.findTableValueToArray(databaseId, sql, finalFirstArray, two); log.info("调用完毕拿到值"); DataValue[][] dataValues = result.getData(); - System.out.println("aaa"+dataValues.length); - System.out.println("aaa"+dataValues.length); - System.out.println("aaa"+dataValues.length); - log.info("远程调用完毕,调用数量{},adsad{}", sql,dataValues.length); + System.out.println("aaa" + dataValues.length); + System.out.println("aaa" + dataValues.length); + System.out.println("aaa" + dataValues.length); + log.info("远程调用完毕,调用数量{},adsad{}", sql, dataValues.length); for (DataValue[] dataValue : dataValues) { for (DataValue dataValue1 : dataValue) { String key = dataValue1.getKey(); @@ -267,6 +283,52 @@ public class TaskServiceImpl extends ServiceImpl implement log.info("添加到queue里成功"); } + + private void executeTheRule(DataValue[][] dataValues, HashMap map, Long newBasicId, + Long tableId, Long taskId) { + // 创建一个单线程的ExecutorService + ExecutorService executor = Executors.newSingleThreadExecutor(); + + // 创建一个链表来保存任务 + LinkedList> tasks = new LinkedList<>(); + + tasks.add(() -> { + return dataValues; + }); + + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(NodeRule::getTaskId, taskId); + List nodeRuleList = nodeRuleService.list(wrapper); + Future futureData = executor.submit(tasks.poll()); + if (!CollectionUtils.isEmpty(nodeRuleList)) { + for (NodeRule nodeRule : nodeRuleList) { +// Result versionResult = ruleFeign.getVersion(nodeRule.getNodeId()); +// EngineVersion version = versionResult.getData(); +// final Future finalCurrentFuture = futureData; +// log.info(finalCurrentFuture); +// Callable task = () -> { +// DataValue[][] prevResult = finalCurrentFuture.get(); +// //log.info(data.getClassName()); +// if (version.getId() == 1) { +// for (DataValue[] values : prevResult) { +// for (DataValue value : values) { +// try { +// +// } catch (Exception e) { +// log.error(e.getMessage()); +// } +// } +// } +// } +// return prevResult; +// }; + } + } + } +} + + + // private void selectAndAdd(Integer count,Long databaseId,Long basicId,String sql,Long tableId, // HashMap newAndOldMap,Integer two) { // int pageSize = 700000; @@ -298,8 +360,6 @@ public class TaskServiceImpl extends ServiceImpl implement // } - - // static class Worker implements Runnable { // private final PriorityBlockingQueue queue; // @@ -390,6 +450,3 @@ public class TaskServiceImpl extends ServiceImpl implement // public static Map> engineMap = new ConcurrentHashMap<>(); // public static Map> engineRowMap = new ConcurrentHashMap<>(); // public static Map> engineDataSetMap = new ConcurrentHashMap<>(); - - -}