diff --git a/cloud-task-common/pom.xml b/cloud-task-common/pom.xml index cf1c493..09a231e 100644 --- a/cloud-task-common/pom.xml +++ b/cloud-task-common/pom.xml @@ -25,5 +25,9 @@ com.muyu cloud-datasources-common + + com.muyu + cloud-rule-common + diff --git a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java index 2de7e13..3b5d3bd 100644 --- a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java +++ b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskAbstracts.java @@ -2,7 +2,7 @@ package com.muyu.common.domian.basic.abstracts; import com.muyu.common.domian.basic.BasicTask; -public abstract class DataTaskAbstracts implements BasicTask { +public class DataTaskAbstracts implements BasicTask { @Override diff --git a/cloud-task-remote/pom.xml b/cloud-task-remote/pom.xml index 0f212b5..9ae85dd 100644 --- a/cloud-task-remote/pom.xml +++ b/cloud-task-remote/pom.xml @@ -21,6 +21,10 @@ com.muyu cloud-task-common + + com.muyu + cloud-rule-common + diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java new file mode 100644 index 0000000..bf9a6ac --- /dev/null +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java @@ -0,0 +1,28 @@ +package com.muyu.remote.feign.Factory; + +import com.muyu.common.core.domain.Result; +import com.muyu.common.domain.DataValue; +import com.muyu.remote.feign.DatasourceFeign; +import com.muyu.remote.feign.RuleFeign; +import com.muyu.rule.common.domain.RuleEngineVersion; +import lombok.extern.log4j.Log4j2; +import org.springframework.cloud.openfeign.FallbackFactory; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Log4j2 +@Component +public class RuleFactory implements FallbackFactory { + + @Override + public RuleFeign create(Throwable cause) { + return new RuleFeign() { + @Override + public Result findVersionById(Long id) { + log.info(cause); + return Result.error("网络开小差......"); + } + }; + } +} diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java new file mode 100644 index 0000000..0447a32 --- /dev/null +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java @@ -0,0 +1,14 @@ +package com.muyu.remote.feign; + +import com.muyu.common.core.domain.Result; +import com.muyu.remote.feign.Factory.RuleFactory; +import com.muyu.rule.common.domain.RuleEngineVersion; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; + +@FeignClient(value = "cloud-etl-rule",fallbackFactory= RuleFactory.class) +public interface RuleFeign { + @PostMapping("/findVersionById/{id}") + public Result findVersionById(@PathVariable("id") Long id); +} diff --git a/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 1118c60..ddb750f 100644 --- a/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-task-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,2 @@ com.muyu.remote.feign.Factory.DatasourceFeignFactory +com.muyu.remote.feign.Factory.RuleFactory diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java b/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java index 43ce07e..350e7b1 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/controller/NodeRuleController.java @@ -15,7 +15,7 @@ public class NodeRuleController { private NodeRuleService nodeRuleService; - @PostMapping("delete/{id}/{nodeId}") + @PostMapping("deleteNodeRule/{id}/{nodeId}") public Result delete(@PathVariable("id") Long id, @PathVariable("nodeId") Long nodeId) { QueryWrapper wrapper = new QueryWrapper<>(); wrapper.eq("node_id", nodeId); diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index 344500f..5292e8a 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -1,5 +1,11 @@ package com.muyu.task.server.service.impl; +import com.muyu.common.domian.*; +import com.muyu.common.domian.basic.abstracts.DataTaskAbstracts; +import com.muyu.remote.feign.RuleFeign; +import com.muyu.rule.common.basic.BasicEngine; +import com.muyu.rule.common.domain.RuleEngineVersion; +import com.muyu.task.server.service.*; import com.muyu.task.server.thread.OptimizedPrioritizedThreadPool; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -8,19 +14,11 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.muyu.common.core.domain.Result; import com.muyu.common.core.utils.StringUtils; import com.muyu.common.domain.DataValue; -import com.muyu.common.domian.NodeJoint; -import com.muyu.common.domian.TaskInfo; -import com.muyu.common.domian.TaskInput; -import com.muyu.common.domian.TaskOutput; import com.muyu.common.domian.enums.Weight; import com.muyu.common.domian.req.TaskInfoListReq; import com.muyu.common.domian.resp.TaskInfoResp; import com.muyu.remote.feign.DatasourceFeign; import com.muyu.task.server.mapper.TaskInfoMapper; -import com.muyu.task.server.service.NodeJointService; -import com.muyu.task.server.service.TaskInfoService; -import com.muyu.task.server.service.TaskInputService; -import com.muyu.task.server.service.TaskOutputService; import lombok.extern.log4j.Log4j2; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; @@ -158,6 +156,8 @@ public class TaskInfoServiceImpl extends ServiceImpl i @Override public String findByFieName2(Long taskId) { + DataTaskAbstracts taskAbstracts = new DataTaskAbstracts(); + taskAbstracts.set(taskId); long start = System.currentTimeMillis(); TaskInfo taskInfo = taskInfoMapper.selectById(taskId); String weight = taskInfo.getWeight(); @@ -202,7 +202,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i //查询表的数量 Long count = getCount(joint, basicId); //查询和添加 - extracted(count, weight, fieName, joint, basicId, newBasicId, tableId, map, num); + extracted(count, taskId, weight, fieName, joint, basicId, newBasicId, tableId, map, num); long end = System.currentTimeMillis(); //log.info("执行时间:{}",end-start); return null; @@ -216,7 +216,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i return data; } - private void extracted(Long data, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) { + private void extracted(Long data, Long taskId, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap map, Integer num) { long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0); for (long i = 1; i <= count; i++) { long pageNum = (i - 1) * PAGE_SIZE; @@ -224,19 +224,19 @@ public class TaskInfoServiceImpl extends ServiceImpl i if (Weight.high.getValue().equals(weight)) { log.info("执行高级任务"); submitHighPriorityTask(() -> { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); }); } if (Weight.centre.getValue().equals(weight)) { submitMediumPriorityTask(() -> { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); }); } if (Weight.low.getValue().equals(weight)) { submitLowPriorityTask(() -> { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); }); } @@ -247,7 +247,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i OptimizedPrioritizedThreadPool.remainingTasks.set(0); submitEmergencyTask(() -> { try { - getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); + getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId); } finally { // 减少剩余任务计数 if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) { @@ -334,7 +334,8 @@ public class TaskInfoServiceImpl extends ServiceImpl i Long tableId, HashMap map, Long one, - Integer two) { + Integer two, + Long taskId) { String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT " + PAGE_SIZE + " OFFSET " + pageNum; log.info(sqlSelect); @@ -353,13 +354,25 @@ public class TaskInfoServiceImpl extends ServiceImpl i // } // Result result = datasourceFeign.addProduct(newBasicId, tableId, data); // log.info("{}添加结束", result); - executeTheRule(data,map,newBasicId,tableId); + executeTheRule(data, map, newBasicId, tableId, taskId); } + @Autowired + private NodeRuleService nodeRuleService; - private void executeTheRule(DataValue[][] dataValues,HashMap map, Long newBasicId, - Long tableId) { + @Autowired + private RuleFeign ruleFeign; + + /** + * 存放实例化引擎的容器 + */ + public static Map> engineMap = new ConcurrentHashMap<>(); + public static Map> engineRowMap = new ConcurrentHashMap<>(); + public static Map> engineDataSetMap = new ConcurrentHashMap<>(); + + private void executeTheRule(DataValue[][] dataValues, HashMap map, Long newBasicId, + Long tableId, Long taskId) { // 创建一个单线程的ExecutorService ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -371,16 +384,33 @@ public class TaskInfoServiceImpl extends ServiceImpl i log.info(dataValues); return dataValues; }); - + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("task_id", taskId); + List list = nodeRuleService.list(wrapper); // 提交第一个任务 Future currentFuture = executor.submit(tasks.poll()); - for (int i = 1; i <= 4; i++) { + for (NodeRule nodeRule : list) { + Result result = ruleFeign.findVersionById(nodeRule.getRuleId()); + RuleEngineVersion data = result.getData(); final Future finalCurrentFuture = currentFuture; log.info(finalCurrentFuture); Callable task = () -> { - DataValue[][] prevResult = finalCurrentFuture.get(); - + if (data.getRuleId().equals("3")) { + BasicEngine basicEngine = engineDataSetMap.get(data.getClassName()); + basicEngine.set(prevResult); + basicEngine.execution(); + DataValue[][] dataValues1 = basicEngine.get(); + } + if (data.getRuleId().equals("1")) { + for (DataValue[] values : prevResult) { + for (DataValue value : values) { + BasicEngine dataValueBasicEngine = engineMap.get(data.getClassName()); + dataValueBasicEngine.set(value); + dataValueBasicEngine.execution(); + } + } + } return prevResult; }; @@ -390,13 +420,13 @@ public class TaskInfoServiceImpl extends ServiceImpl i // 等待当前任务完成 try { currentFuture.get(); - System.out.println("Task " + i + " completed with result:"); } catch (InterruptedException | ExecutionException e) { Thread.currentThread().interrupt(); System.out.println("Task execution failed: " + e.getMessage()); break; } } + try { DataValue[][] afterFilteringDataValue = currentFuture.get(); for (DataValue[] datum : afterFilteringDataValue) { @@ -419,4 +449,5 @@ public class TaskInfoServiceImpl extends ServiceImpl i executor.shutdown(); } + }