From 691434af07bccb066671a507d4fabcb5ae14dd16 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=99=88=E6=80=9D=E8=B1=AA?= <1437200870@qq.com>
Date: Tue, 10 Sep 2024 02:48:04 +0800
Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=BA=BF=E7=A8=8B=E6=B1=A0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
cloud-task-common/pom.xml | 6 +
.../java/com/muyu/task/feign/RuleFeign.java | 22 ++-
.../task/feign/factory/RuleFeignFactory.java | 25 +++
.../muyu/service/impl/TaskServiceImpl.java | 145 ++++++++++++------
4 files changed, 147 insertions(+), 51 deletions(-)
create mode 100644 cloud-task-remote/src/main/java/com/muyu/task/feign/factory/RuleFeignFactory.java
diff --git a/cloud-task-common/pom.xml b/cloud-task-common/pom.xml
index 7f73f9d..62d6662 100644
--- a/cloud-task-common/pom.xml
+++ b/cloud-task-common/pom.xml
@@ -30,6 +30,12 @@
1.0.0
+
+
+
+
+
+
diff --git a/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java b/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java
index e582504..59ce880 100644
--- a/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java
+++ b/cloud-task-remote/src/main/java/com/muyu/task/feign/RuleFeign.java
@@ -1,7 +1,15 @@
-package com.muyu.task.feign;
-
-import org.springframework.cloud.openfeign.FeignClient;
-
-@FeignClient(name = "cloud-engine" )
-public interface RuleFeign {
-}
+//package com.muyu.task.feign;
+//
+//import com.muyu.common.core.domain.Result;
+//import com.muyu.domain.EngineVersion;
+//import com.muyu.task.feign.factory.RuleFeignFactory;
+//import org.springframework.cloud.openfeign.FeignClient;
+//import org.springframework.web.bind.annotation.GetMapping;
+//import org.springframework.web.bind.annotation.PathVariable;
+//
+//@FeignClient(name = "cloud-engine",fallbackFactory = RuleFeignFactory.class)
+//public interface RuleFeign {
+//
+// @GetMapping("/version/getVersion/{id}")
+// public Result getVersion(@PathVariable("id") Long id);
+//}
diff --git a/cloud-task-remote/src/main/java/com/muyu/task/feign/factory/RuleFeignFactory.java b/cloud-task-remote/src/main/java/com/muyu/task/feign/factory/RuleFeignFactory.java
new file mode 100644
index 0000000..9c03f6e
--- /dev/null
+++ b/cloud-task-remote/src/main/java/com/muyu/task/feign/factory/RuleFeignFactory.java
@@ -0,0 +1,25 @@
+//package com.muyu.task.feign.factory;
+//
+////import com.muyu.task.feign.RuleFeign;
+//import lombok.extern.log4j.Log4j2;
+//import org.springframework.cloud.openfeign.FallbackFactory;
+//import org.springframework.stereotype.Component;
+//
+///**
+// * @PackageName:com.muyu.task.feign.factory
+// * @ClassName:RuleFeignFactory
+// * @Description:
+// * @author: ¥陈思豪¥
+// * @date: 2024/9/9 17:35
+// */
+//
+//@Log4j2
+//@Component
+//public class RuleFeignFactory implements FallbackFactory {
+//
+//
+// @Override
+// public RuleFeign create(Throwable cause) {
+// return null;
+// }
+//}
diff --git a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java
index 9e685bd..c4ec333 100644
--- a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java
+++ b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java
@@ -10,15 +10,22 @@ import com.muyu.mapper.TaskMapper;
import com.muyu.service.*;
import com.muyu.task.PriorityThreadPool;
import com.muyu.task.feign.DataValueClient;
-import com.muyu.task.feign.RuleFeign;
+//import com.muyu.task.feign.RuleFeign;
+import jakarta.annotation.Resource;
import jakarta.validation.constraints.NotNull;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import static com.muyu.task.PriorityThreadPool.*;
@@ -32,7 +39,7 @@ import static com.muyu.task.PriorityThreadPool.*;
*/
@Service
@Log4j2
-public class TaskServiceImpl extends ServiceImpl implements TaskService {
+public class TaskServiceImpl extends ServiceImpl implements TaskService {
@Autowired
private TaskInputService taskInputService;
@@ -49,6 +56,12 @@ public class TaskServiceImpl extends ServiceImpl implement
@Autowired
private DataValueClient dataValueClient;
+ @Autowired
+ private NodeRuleService nodeRuleService;
+
+// @Autowired
+// private RuleFeign ruleFeign;
+
@Override
public List selectList(TaskInfoReq taskInfoReq) {
@@ -57,14 +70,14 @@ public class TaskServiceImpl extends ServiceImpl implement
StringUtils.isNotEmpty(taskInfoReq.getName()),
TaskInfo::getName, taskInfoReq.getName()
);
- if(taskInfoReq.getStatus()!=null && taskInfoReq.getStatus()!=0){
+ if (taskInfoReq.getStatus() != null && taskInfoReq.getStatus() != 0) {
queryWrapper.eq(
- TaskInfo::getStatus,taskInfoReq.getStatus()
+ TaskInfo::getStatus, taskInfoReq.getStatus()
);
}
- if(taskInfoReq.getWeigh()!=null && taskInfoReq.getWeigh()!=0 ){
+ if (taskInfoReq.getWeigh() != null && taskInfoReq.getWeigh() != 0) {
queryWrapper.eq(
- TaskInfo::getWeigh,taskInfoReq.getWeigh()
+ TaskInfo::getWeigh, taskInfoReq.getWeigh()
);
}
return this.list(queryWrapper);
@@ -73,7 +86,7 @@ public class TaskServiceImpl extends ServiceImpl implement
@Override
public String addTask(TaskInfo taskInfo) {
boolean save = this.save(taskInfo);
- if(save == false){
+ if (save == false) {
throw new RuntimeException("err");
}
return "success";
@@ -90,7 +103,7 @@ public class TaskServiceImpl extends ServiceImpl implement
public String deleteById(Integer id) {
TaskMapper taskMapper = this.baseMapper;
int i = taskMapper.deleteById(id);
- if(i<=0){
+ if (i <= 0) {
throw new RuntimeException("err");
}
return "success";
@@ -101,7 +114,7 @@ public class TaskServiceImpl extends ServiceImpl implement
LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TaskInputInfo::getTaskId, taskId);
List taskInputList = taskInputService.list(queryWrapper);
- if(taskInputList.isEmpty()){
+ if (taskInputList.isEmpty()) {
return "表节点没有进行相对应的选择,无法继续执行";
}
@@ -119,7 +132,7 @@ public class TaskServiceImpl extends ServiceImpl implement
for (int i = 0; i < taskInputList.size(); i++) {
basicList.add(taskInputList.get(i).getDatabaseId());
databaseId = Long.parseLong(taskInputList.get(i).getDatabaseId());
- if(basicList.size()>1){
+ if (basicList.size() > 1) {
throw new RuntimeException("数据库选择的不同,无法执行");
}
String[] tableFieldList = taskInputList.get(i).getTableField().split(",");
@@ -129,9 +142,9 @@ public class TaskServiceImpl extends ServiceImpl implement
fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]);
for (int o = 0; o < newFileName.length; o++) {
newAndOldMap.put(fieldAsNameMap.get(outPutFileName[o]), newFileName[o]);
- if(tableAsFieldList[j].equals(outPutFileName[o])){
- sql += ","+taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] +" " +
- tableAsFieldList[j] ;
+ if (tableAsFieldList[j].equals(outPutFileName[o])) {
+ sql += "," + taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] + " " +
+ tableAsFieldList[j];
}
}
@@ -146,7 +159,7 @@ public class TaskServiceImpl extends ServiceImpl implement
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId);
List taskJoinInfos = taskJoinService.list(lambdaQueryWrapper);
- if(!taskJoinInfos.isEmpty()){
+ if (!taskJoinInfos.isEmpty()) {
for (TaskJoinInfo taskJoinInfo : taskJoinInfos) {
//拿到左表id
String leftId = taskJoinInfo.getLeftId();
@@ -154,13 +167,13 @@ public class TaskServiceImpl extends ServiceImpl implement
//拿到右表id
String rightId = taskJoinInfo.getRightId();
TaskInputInfo rightInput = taskInputService.findByNodeId(rightId);
- leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " +
+ leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName() + " " +
taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " +
- rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." +
+ rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getRightJoinField()) + " ";
}
- }else{
+ } else {
LambdaQueryWrapper selectOne = new LambdaQueryWrapper<>();
selectOne.eq(TaskInputInfo::getTaskId, taskId);
TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne);
@@ -168,21 +181,19 @@ public class TaskServiceImpl extends ServiceImpl implement
}
-
-
-
-
- LambdaQueryWrapper taskInfoQueryWrapper = new LambdaQueryWrapper<>();
+ LambdaQueryWrapper taskInfoQueryWrapper = new LambdaQueryWrapper<>();
taskInfoQueryWrapper.eq(TaskInfo::getId, taskId);
TaskInfo taskInfo = taskMapper.selectById(taskId);
Long basicId = Long.valueOf(taskOutputInfo.getBasicId());
Long tableId = Long.valueOf(taskOutputInfo.getTableId());
- String sqlCount = "select count(1) from "+leftJoin;
+ String sqlCount = "select count(1) from " + leftJoin;
//查询出总条数
+ long l = System.currentTimeMillis();
+ log.info("开始查询!!!!!!!!!!!!!!!");
Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount);
- log.info("查询到的条数为{}",count);
+ log.info("查询到的条数为{}", count);
int pageSize = 10000;
int totalSegments = (int) Math.ceil((double) count / pageSize);
log.info("总共页码为{}", totalSegments);
@@ -192,17 +203,17 @@ public class TaskServiceImpl extends ServiceImpl implement
log.info("当前页为{}", i);
int pageNum = i * pageSize;
long finalFirstArray = Math.min(pageSize, count - pageNum);
- String limit = " limit " + pageNum + "," + pageSize;
+ String limit = " limit " + pageNum + "," + pageSize;
String limitSelect = sql + limit;
Long finalDatabaseId = databaseId;
log.info("执行查询语句为{}", limitSelect);
- if(taskInfo.getWeigh() == 4){
+ if (taskInfo.getWeigh() == 4) {
log.info("执行紧急任务");
PriorityThreadPool.activeEmergencyTasks.set(0);
PriorityThreadPool.remainingTasks.set(0);
executeUrgently(() -> {
try {
- selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
+ selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
} finally {
if (PriorityThreadPool.remainingTasks.decrementAndGet() == 0) {
System.out.println("All emergency tasks have completed.");
@@ -211,24 +222,24 @@ public class TaskServiceImpl extends ServiceImpl implement
});
}
- if(taskInfo.getWeigh() == 3){
+ if (taskInfo.getWeigh() == 3) {
log.info("执行高级任务");
executeHigh(() -> {
- selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
+ selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
});
}
- if(taskInfo.getWeigh() == 2){
+ if (taskInfo.getWeigh() == 2) {
log.info("执行中级任务");
executeMedium(() -> {
- selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
+ selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
});
}
- if(taskInfo.getWeigh() == 1){
+ if (taskInfo.getWeigh() == 1) {
log.info("执行低级任务");
executeLow(() -> {
- selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId,newAndOldMap, finalFirstArray,two);
+ selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
});
}
}
@@ -240,20 +251,25 @@ public class TaskServiceImpl extends ServiceImpl implement
return "success";
}
+// @Resource
+// private Mysql mySqlDataSource;
+
@NotNull
- private void selectAndAdd(Long databaseId,Long basicId,String sql,Long tableId,
- HashMap newAndOldMap,Long finalFirstArray ,Integer two) {
+ private void selectAndAdd(Long databaseId, Long basicId, String sql, Long tableId,
+ HashMap newAndOldMap, Long finalFirstArray, Integer two) {
log.info("开始查询数据");
log.info("sql{}", sql);
log.info("数组为{},,{}", finalFirstArray, two);
log.info("开始远调");
+// mySqlDataSource.setQuery(MySqlQuery.builder().dataSourceId(databaseId).sql(sql).one(finalFirstArray).two(two).build());
+// com.muyu.access.data.base.DataValue[][] rows = mySqlDataSource.getRows();
Result result = dataValueClient.findTableValueToArray(databaseId, sql, finalFirstArray, two);
log.info("调用完毕拿到值");
DataValue[][] dataValues = result.getData();
- System.out.println("aaa"+dataValues.length);
- System.out.println("aaa"+dataValues.length);
- System.out.println("aaa"+dataValues.length);
- log.info("远程调用完毕,调用数量{},adsad{}", sql,dataValues.length);
+ System.out.println("aaa" + dataValues.length);
+ System.out.println("aaa" + dataValues.length);
+ System.out.println("aaa" + dataValues.length);
+ log.info("远程调用完毕,调用数量{},adsad{}", sql, dataValues.length);
for (DataValue[] dataValue : dataValues) {
for (DataValue dataValue1 : dataValue) {
String key = dataValue1.getKey();
@@ -267,6 +283,52 @@ public class TaskServiceImpl extends ServiceImpl implement
log.info("添加到queue里成功");
}
+
+ private void executeTheRule(DataValue[][] dataValues, HashMap map, Long newBasicId,
+ Long tableId, Long taskId) {
+ // 创建一个单线程的ExecutorService
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ // 创建一个链表来保存任务
+ LinkedList> tasks = new LinkedList<>();
+
+ tasks.add(() -> {
+ return dataValues;
+ });
+
+ LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>();
+ wrapper.eq(NodeRule::getTaskId, taskId);
+ List nodeRuleList = nodeRuleService.list(wrapper);
+ Future futureData = executor.submit(tasks.poll());
+ if (!CollectionUtils.isEmpty(nodeRuleList)) {
+ for (NodeRule nodeRule : nodeRuleList) {
+// Result versionResult = ruleFeign.getVersion(nodeRule.getNodeId());
+// EngineVersion version = versionResult.getData();
+// final Future finalCurrentFuture = futureData;
+// log.info(finalCurrentFuture);
+// Callable task = () -> {
+// DataValue[][] prevResult = finalCurrentFuture.get();
+// //log.info(data.getClassName());
+// if (version.getId() == 1) {
+// for (DataValue[] values : prevResult) {
+// for (DataValue value : values) {
+// try {
+//
+// } catch (Exception e) {
+// log.error(e.getMessage());
+// }
+// }
+// }
+// }
+// return prevResult;
+// };
+ }
+ }
+ }
+}
+
+
+
// private void selectAndAdd(Integer count,Long databaseId,Long basicId,String sql,Long tableId,
// HashMap newAndOldMap,Integer two) {
// int pageSize = 700000;
@@ -298,8 +360,6 @@ public class TaskServiceImpl extends ServiceImpl implement
// }
-
-
// static class Worker implements Runnable {
// private final PriorityBlockingQueue queue;
//
@@ -390,6 +450,3 @@ public class TaskServiceImpl extends ServiceImpl implement
// public static Map> engineMap = new ConcurrentHashMap<>();
// public static Map> engineRowMap = new ConcurrentHashMap<>();
// public static Map> engineDataSetMap = new ConcurrentHashMap<>();
-
-
-}