修改分页条数

master
Cui YongXing 2024-09-08 16:35:31 +08:00
parent b233ff8d75
commit c7964084c2
8 changed files with 107 additions and 25 deletions

View File

@ -25,5 +25,9 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-datasources-common</artifactId> <artifactId>cloud-datasources-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-rule-common</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -2,7 +2,7 @@ package com.muyu.common.domian.basic.abstracts;
import com.muyu.common.domian.basic.BasicTask; import com.muyu.common.domian.basic.BasicTask;
public abstract class DataTaskAbstracts implements BasicTask { public class DataTaskAbstracts implements BasicTask {
@Override @Override

View File

@ -21,6 +21,10 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-task-common</artifactId> <artifactId>cloud-task-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-rule-common</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,28 @@
package com.muyu.remote.feign.Factory;
import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.DataValue;
import com.muyu.remote.feign.DatasourceFeign;
import com.muyu.remote.feign.RuleFeign;
import com.muyu.rule.common.domain.RuleEngineVersion;
import lombok.extern.log4j.Log4j2;
import org.springframework.cloud.openfeign.FallbackFactory;
import org.springframework.stereotype.Component;
import java.util.List;
@Log4j2
@Component
public class RuleFactory implements FallbackFactory<RuleFeign> {
@Override
public RuleFeign create(Throwable cause) {
return new RuleFeign() {
@Override
public Result<RuleEngineVersion> findVersionById(Long id) {
log.info(cause);
return Result.error("网络开小差......");
}
};
}
}

View File

@ -0,0 +1,14 @@
package com.muyu.remote.feign;
import com.muyu.common.core.domain.Result;
import com.muyu.remote.feign.Factory.RuleFactory;
import com.muyu.rule.common.domain.RuleEngineVersion;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@FeignClient(value = "cloud-etl-rule",fallbackFactory= RuleFactory.class)
public interface RuleFeign {
@PostMapping("/findVersionById/{id}")
public Result<RuleEngineVersion> findVersionById(@PathVariable("id") Long id);
}

View File

@ -1 +1,2 @@
com.muyu.remote.feign.Factory.DatasourceFeignFactory com.muyu.remote.feign.Factory.DatasourceFeignFactory
com.muyu.remote.feign.Factory.RuleFactory

View File

@ -15,7 +15,7 @@ public class NodeRuleController {
private NodeRuleService nodeRuleService; private NodeRuleService nodeRuleService;
@PostMapping("delete/{id}/{nodeId}") @PostMapping("deleteNodeRule/{id}/{nodeId}")
public Result delete(@PathVariable("id") Long id, @PathVariable("nodeId") Long nodeId) { public Result delete(@PathVariable("id") Long id, @PathVariable("nodeId") Long nodeId) {
QueryWrapper<NodeRule> wrapper = new QueryWrapper<>(); QueryWrapper<NodeRule> wrapper = new QueryWrapper<>();
wrapper.eq("node_id", nodeId); wrapper.eq("node_id", nodeId);

View File

@ -1,5 +1,11 @@
package com.muyu.task.server.service.impl; package com.muyu.task.server.service.impl;
import com.muyu.common.domian.*;
import com.muyu.common.domian.basic.abstracts.DataTaskAbstracts;
import com.muyu.remote.feign.RuleFeign;
import com.muyu.rule.common.basic.BasicEngine;
import com.muyu.rule.common.domain.RuleEngineVersion;
import com.muyu.task.server.service.*;
import com.muyu.task.server.thread.OptimizedPrioritizedThreadPool; import com.muyu.task.server.thread.OptimizedPrioritizedThreadPool;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@ -8,19 +14,11 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils; import com.muyu.common.core.utils.StringUtils;
import com.muyu.common.domain.DataValue; import com.muyu.common.domain.DataValue;
import com.muyu.common.domian.NodeJoint;
import com.muyu.common.domian.TaskInfo;
import com.muyu.common.domian.TaskInput;
import com.muyu.common.domian.TaskOutput;
import com.muyu.common.domian.enums.Weight; import com.muyu.common.domian.enums.Weight;
import com.muyu.common.domian.req.TaskInfoListReq; import com.muyu.common.domian.req.TaskInfoListReq;
import com.muyu.common.domian.resp.TaskInfoResp; import com.muyu.common.domian.resp.TaskInfoResp;
import com.muyu.remote.feign.DatasourceFeign; import com.muyu.remote.feign.DatasourceFeign;
import com.muyu.task.server.mapper.TaskInfoMapper; import com.muyu.task.server.mapper.TaskInfoMapper;
import com.muyu.task.server.service.NodeJointService;
import com.muyu.task.server.service.TaskInfoService;
import com.muyu.task.server.service.TaskInputService;
import com.muyu.task.server.service.TaskOutputService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -158,6 +156,8 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
@Override @Override
public String findByFieName2(Long taskId) { public String findByFieName2(Long taskId) {
DataTaskAbstracts taskAbstracts = new DataTaskAbstracts();
taskAbstracts.set(taskId);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
TaskInfo taskInfo = taskInfoMapper.selectById(taskId); TaskInfo taskInfo = taskInfoMapper.selectById(taskId);
String weight = taskInfo.getWeight(); String weight = taskInfo.getWeight();
@ -202,7 +202,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
//查询表的数量 //查询表的数量
Long count = getCount(joint, basicId); Long count = getCount(joint, basicId);
//查询和添加 //查询和添加
extracted(count, weight, fieName, joint, basicId, newBasicId, tableId, map, num); extracted(count, taskId, weight, fieName, joint, basicId, newBasicId, tableId, map, num);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
//log.info("执行时间:{}",end-start); //log.info("执行时间:{}",end-start);
return null; return null;
@ -216,7 +216,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
return data; return data;
} }
private void extracted(Long data, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap<String, String> map, Integer num) { private void extracted(Long data, Long taskId, String weight, String finalFieName, String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap<String, String> map, Integer num) {
long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0); long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0);
for (long i = 1; i <= count; i++) { for (long i = 1; i <= count; i++) {
long pageNum = (i - 1) * PAGE_SIZE; long pageNum = (i - 1) * PAGE_SIZE;
@ -224,19 +224,19 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
if (Weight.high.getValue().equals(weight)) { if (Weight.high.getValue().equals(weight)) {
log.info("执行高级任务"); log.info("执行高级任务");
submitHighPriorityTask(() -> { submitHighPriorityTask(() -> {
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
}); });
} }
if (Weight.centre.getValue().equals(weight)) { if (Weight.centre.getValue().equals(weight)) {
submitMediumPriorityTask(() -> { submitMediumPriorityTask(() -> {
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
}); });
} }
if (Weight.low.getValue().equals(weight)) { if (Weight.low.getValue().equals(weight)) {
submitLowPriorityTask(() -> { submitLowPriorityTask(() -> {
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
}); });
} }
@ -247,7 +247,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
OptimizedPrioritizedThreadPool.remainingTasks.set(0); OptimizedPrioritizedThreadPool.remainingTasks.set(0);
submitEmergencyTask(() -> { submitEmergencyTask(() -> {
try { try {
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num); getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num, taskId);
} finally { } finally {
// 减少剩余任务计数 // 减少剩余任务计数
if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) { if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) {
@ -334,7 +334,8 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
Long tableId, Long tableId,
HashMap<String, String> map, HashMap<String, String> map,
Long one, Long one,
Integer two) { Integer two,
Long taskId) {
String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT " + PAGE_SIZE + " OFFSET " + pageNum; String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT " + PAGE_SIZE + " OFFSET " + pageNum;
log.info(sqlSelect); log.info(sqlSelect);
@ -353,13 +354,25 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
// } // }
// Result result = datasourceFeign.addProduct(newBasicId, tableId, data); // Result result = datasourceFeign.addProduct(newBasicId, tableId, data);
// log.info("{}添加结束", result); // log.info("{}添加结束", result);
executeTheRule(data,map,newBasicId,tableId); executeTheRule(data, map, newBasicId, tableId, taskId);
} }
@Autowired
private NodeRuleService nodeRuleService;
private void executeTheRule(DataValue[][] dataValues,HashMap<String,String> map, Long newBasicId, @Autowired
Long tableId) { private RuleFeign ruleFeign;
/**
*
*/
public static Map<String, BasicEngine<DataValue>> engineMap = new ConcurrentHashMap<>();
public static Map<String, BasicEngine<DataValue[]>> engineRowMap = new ConcurrentHashMap<>();
public static Map<String, BasicEngine<DataValue[][]>> engineDataSetMap = new ConcurrentHashMap<>();
private void executeTheRule(DataValue[][] dataValues, HashMap<String, String> map, Long newBasicId,
Long tableId, Long taskId) {
// 创建一个单线程的ExecutorService // 创建一个单线程的ExecutorService
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
@ -371,16 +384,33 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
log.info(dataValues); log.info(dataValues);
return dataValues; return dataValues;
}); });
QueryWrapper<NodeRule> wrapper = new QueryWrapper<>();
wrapper.eq("task_id", taskId);
List<NodeRule> list = nodeRuleService.list(wrapper);
// 提交第一个任务 // 提交第一个任务
Future<DataValue[][]> currentFuture = executor.submit(tasks.poll()); Future<DataValue[][]> currentFuture = executor.submit(tasks.poll());
for (int i = 1; i <= 4; i++) { for (NodeRule nodeRule : list) {
Result<RuleEngineVersion> result = ruleFeign.findVersionById(nodeRule.getRuleId());
RuleEngineVersion data = result.getData();
final Future<DataValue[][]> finalCurrentFuture = currentFuture; final Future<DataValue[][]> finalCurrentFuture = currentFuture;
log.info(finalCurrentFuture); log.info(finalCurrentFuture);
Callable<DataValue[][]> task = () -> { Callable<DataValue[][]> task = () -> {
DataValue[][] prevResult = finalCurrentFuture.get(); DataValue[][] prevResult = finalCurrentFuture.get();
if (data.getRuleId().equals("3")) {
BasicEngine<DataValue[][]> basicEngine = engineDataSetMap.get(data.getClassName());
basicEngine.set(prevResult);
basicEngine.execution();
DataValue[][] dataValues1 = basicEngine.get();
}
if (data.getRuleId().equals("1")) {
for (DataValue[] values : prevResult) {
for (DataValue value : values) {
BasicEngine<DataValue> dataValueBasicEngine = engineMap.get(data.getClassName());
dataValueBasicEngine.set(value);
dataValueBasicEngine.execution();
}
}
}
return prevResult; return prevResult;
}; };
@ -390,13 +420,13 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
// 等待当前任务完成 // 等待当前任务完成
try { try {
currentFuture.get(); currentFuture.get();
System.out.println("Task " + i + " completed with result:");
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
System.out.println("Task execution failed: " + e.getMessage()); System.out.println("Task execution failed: " + e.getMessage());
break; break;
} }
} }
try { try {
DataValue[][] afterFilteringDataValue = currentFuture.get(); DataValue[][] afterFilteringDataValue = currentFuture.get();
for (DataValue[] datum : afterFilteringDataValue) { for (DataValue[] datum : afterFilteringDataValue) {
@ -419,4 +449,5 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
executor.shutdown(); executor.shutdown();
} }
} }