From c7964084c2b423810ca436c4372dfce7e995d139 Mon Sep 17 00:00:00 2001
From: Cui YongXing <2835316714@qq.com>
Date: Sun, 8 Sep 2024 16:35:31 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=88=86=E9=A1=B5=E6=9D=A1?=
=?UTF-8?q?=E6=95=B0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
cloud-task-common/pom.xml | 4 +
.../basic/abstracts/DataTaskAbstracts.java | 2 +-
cloud-task-remote/pom.xml | 4 +
.../remote/feign/Factory/RuleFactory.java | 28 +++++++
.../java/com/muyu/remote/feign/RuleFeign.java | 14 ++++
...ot.autoconfigure.AutoConfiguration.imports | 1 +
.../server/controller/NodeRuleController.java | 2 +-
.../service/impl/TaskInfoServiceImpl.java | 77 +++++++++++++------
8 files changed, 107 insertions(+), 25 deletions(-)
create mode 100644 cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java
create mode 100644 cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java
diff --git a/cloud-task-common/pom.xml b/cloud-task-common/pom.xml
index cf1c493..09a231e 100644
--- a/cloud-task-common/pom.xml
+++ b/cloud-task-common/pom.xml
@@ -25,5 +25,9 @@
com.muyu
cloud-datasources-common
+
+ com.muyu
+ cloud-rule-common
+
diff --git a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java
index 2de7e13..3b5d3bd 100644
--- a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java
+++ b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java
@@ -2,7 +2,7 @@ package com.muyu.common.domian.basic.abstracts;
import com.muyu.common.domian.basic.BasicTask;
-public abstract class DataTaskAbstracts implements BasicTask {
+public class DataTaskAbstracts implements BasicTask {
@Override
diff --git a/cloud-task-remote/pom.xml b/cloud-task-remote/pom.xml
index 0f212b5..9ae85dd 100644
--- a/cloud-task-remote/pom.xml
+++ b/cloud-task-remote/pom.xml
@@ -21,6 +21,10 @@
com.muyu
cloud-task-common
+
+ com.muyu
+ cloud-rule-common
+
diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java
new file mode 100644
index 0000000..bf9a6ac
--- /dev/null
+++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java
@@ -0,0 +1,28 @@
+package com.muyu.remote.feign.Factory;
+
+import com.muyu.common.core.domain.Result;
+import com.muyu.common.domain.DataValue;
+import com.muyu.remote.feign.DatasourceFeign;
+import com.muyu.remote.feign.RuleFeign;
+import com.muyu.rule.common.domain.RuleEngineVersion;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.cloud.openfeign.FallbackFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Log4j2
+@Component
+public class RuleFactory implements FallbackFactory {
+
+ @Override
+ public RuleFeign create(Throwable cause) {
+ return new RuleFeign() {
+ @Override
+ public Result findVersionById(Long id) {
+ log.info(cause);
+ return Result.error("网络开小差......");
+ }
+ };
+ }
+}
diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java
new file mode 100644
index 0000000..0447a32
--- /dev/null
+++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java
@@ -0,0 +1,14 @@
+package com.muyu.remote.feign;
+
+import com.muyu.common.core.domain.Result;
+import com.muyu.remote.feign.Factory.RuleFactory;
+import com.muyu.rule.common.domain.RuleEngineVersion;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+
+@FeignClient(value = "cloud-etl-rule",fallbackFactory= RuleFactory.class)
+public interface RuleFeign {
+ @PostMapping("/findVersionById/{id}")
+ public Result findVersionById(@PathVariable("id") Long id);
+}
diff --git a/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index 1118c60..ddb750f 100644
--- a/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1 +1,2 @@
com.muyu.remote.feign.Factory.DatasourceFeignFactory
+com.muyu.remote.feign.Factory.RuleFactory
diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java b/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java
index 43ce07e..350e7b1 100644
--- a/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java
+++ b/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java
@@ -15,7 +15,7 @@ public class NodeRuleController {
private NodeRuleService nodeRuleService;
- @PostMapping("delete/{id}/{nodeId}")
+ @PostMapping("deleteNodeRule/{id}/{nodeId}")
public Result delete(@PathVariable("id") Long id, @PathVariable("nodeId") Long nodeId) {
QueryWrapper wrapper = new QueryWrapper<>();
wrapper.eq("node_id", nodeId);
diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java
index 344500f..5292e8a 100644
--- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java
+++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java
@@ -1,5 +1,11 @@
package com.muyu.task.server.service.impl;
+import com.muyu.common.domian.*;
+import com.muyu.common.domian.basic.abstracts.DataTaskAbstracts;
+import com.muyu.remote.feign.RuleFeign;
+import com.muyu.rule.common.basic.BasicEngine;
+import com.muyu.rule.common.domain.RuleEngineVersion;
+import com.muyu.task.server.service.*;
import com.muyu.task.server.thread.OptimizedPrioritizedThreadPool;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -8,19 +14,11 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.common.domain.DataValue;
-import com.muyu.common.domian.NodeJoint;
-import com.muyu.common.domian.TaskInfo;
-import com.muyu.common.domian.TaskInput;
-import com.muyu.common.domian.TaskOutput;
import com.muyu.common.domian.enums.Weight;
import com.muyu.common.domian.req.TaskInfoListReq;
import com.muyu.common.domian.resp.TaskInfoResp;
import com.muyu.remote.feign.DatasourceFeign;
import com.muyu.task.server.mapper.TaskInfoMapper;
-import com.muyu.task.server.service.NodeJointService;
-import com.muyu.task.server.service.TaskInfoService;
-import com.muyu.task.server.service.TaskInputService;
-import com.muyu.task.server.service.TaskOutputService;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
@@ -158,6 +156,8 @@ public class TaskInfoServiceImpl extends ServiceImpl i
@Override
public String findByFieName2(Long taskId) {
+ DataTaskAbstracts taskAbstracts = new DataTaskAbstracts();
+ taskAbstracts.set(taskId);
long start = System.currentTimeMillis();
TaskInfo taskInfo = taskInfoMapper.selectById(taskId);
String weight = taskInfo.getWeight();
@@ -202,7 +202,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i
//查询表的数量
Long count = getCount(joint, basicId);
//查询和添加
- extracted(count, weight, fieName, joint, basicId, newBasicId, tableId, map, num);
+ extracted(count, taskId, weight, fieName, joint, basicId, newBasicId, tableId, map, num);
long end = System.currentTimeMillis();
//log.info("执行时间:{}",end-start);
return null;
@@ -216,7 +216,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i
return data;
}
- private void extracted(Long data, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) {
+ private void extracted(Long data, Long taskId, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) {
long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0);
for (long i = 1; i <= count; i++) {
long pageNum = (i - 1) * PAGE_SIZE;
@@ -224,19 +224,19 @@ public class TaskInfoServiceImpl extends ServiceImpl i
if (Weight.high.getValue().equals(weight)) {
log.info("执行高级任务");
submitHighPriorityTask(() -> {
- getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
+ getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
});
}
if (Weight.centre.getValue().equals(weight)) {
submitMediumPriorityTask(() -> {
- getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
+ getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
});
}
if (Weight.low.getValue().equals(weight)) {
submitLowPriorityTask(() -> {
- getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
+ getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
});
}
@@ -247,7 +247,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i
OptimizedPrioritizedThreadPool.remainingTasks.set(0);
submitEmergencyTask(() -> {
try {
- getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
+ getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
} finally {
// 减少剩余任务计数
if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) {
@@ -334,7 +334,8 @@ public class TaskInfoServiceImpl extends ServiceImpl i
Long tableId,
HashMap map,
Long one,
- Integer two) {
+ Integer two,
+ Long taskId) {
String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT " + PAGE_SIZE + " OFFSET " + pageNum;
log.info(sqlSelect);
@@ -353,13 +354,25 @@ public class TaskInfoServiceImpl extends ServiceImpl i
// }
// Result result = datasourceFeign.addProduct(newBasicId, tableId, data);
// log.info("{}添加结束", result);
- executeTheRule(data,map,newBasicId,tableId);
+ executeTheRule(data, map, newBasicId, tableId, taskId);
}
+ @Autowired
+ private NodeRuleService nodeRuleService;
- private void executeTheRule(DataValue[][] dataValues,HashMap map, Long newBasicId,
- Long tableId) {
+ @Autowired
+ private RuleFeign ruleFeign;
+
+ /**
+ * 存放实例化引擎的容器
+ */
+ public static Map> engineMap = new ConcurrentHashMap<>();
+ public static Map> engineRowMap = new ConcurrentHashMap<>();
+ public static Map> engineDataSetMap = new ConcurrentHashMap<>();
+
+ private void executeTheRule(DataValue[][] dataValues, HashMap map, Long newBasicId,
+ Long tableId, Long taskId) {
// 创建一个单线程的ExecutorService
ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -371,16 +384,33 @@ public class TaskInfoServiceImpl extends ServiceImpl i
log.info(dataValues);
return dataValues;
});
-
+ QueryWrapper wrapper = new QueryWrapper<>();
+ wrapper.eq("task_id", taskId);
+ List list = nodeRuleService.list(wrapper);
// 提交第一个任务
Future currentFuture = executor.submit(tasks.poll());
- for (int i = 1; i <= 4; i++) {
+ for (NodeRule nodeRule : list) {
+ Result result = ruleFeign.findVersionById(nodeRule.getRuleId());
+ RuleEngineVersion data = result.getData();
final Future finalCurrentFuture = currentFuture;
log.info(finalCurrentFuture);
Callable task = () -> {
-
DataValue[][] prevResult = finalCurrentFuture.get();
-
+ if (data.getRuleId().equals("3")) {
+ BasicEngine basicEngine = engineDataSetMap.get(data.getClassName());
+ basicEngine.set(prevResult);
+ basicEngine.execution();
+ DataValue[][] dataValues1 = basicEngine.get();
+ }
+ if (data.getRuleId().equals("1")) {
+ for (DataValue[] values : prevResult) {
+ for (DataValue value : values) {
+ BasicEngine dataValueBasicEngine = engineMap.get(data.getClassName());
+ dataValueBasicEngine.set(value);
+ dataValueBasicEngine.execution();
+ }
+ }
+ }
return prevResult;
};
@@ -390,13 +420,13 @@ public class TaskInfoServiceImpl extends ServiceImpl i
// 等待当前任务完成
try {
currentFuture.get();
- System.out.println("Task " + i + " completed with result:");
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
System.out.println("Task execution failed: " + e.getMessage());
break;
}
}
+
try {
DataValue[][] afterFilteringDataValue = currentFuture.get();
for (DataValue[] datum : afterFilteringDataValue) {
@@ -419,4 +449,5 @@ public class TaskInfoServiceImpl extends ServiceImpl i
executor.shutdown();
}
+
}