cloud-etl-task/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java

453 lines
18 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.muyu.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.domain.*;
import com.muyu.domain.req.TaskInfoReq;
import com.muyu.mapper.TaskMapper;
import com.muyu.service.*;
import com.muyu.task.PriorityThreadPool;
import com.muyu.task.feign.DataValueClient;
//import com.muyu.task.feign.RuleFeign;
import jakarta.annotation.Resource;
import jakarta.validation.constraints.NotNull;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static com.muyu.task.PriorityThreadPool.*;
/**
* @PackageName:com.muyu.service.impl
* @ClassName:TaskServiceImpl
* @Description:
* @author: ¥陈思豪¥
* @date: 2024/8/22 17:15
*/
@Service
@Log4j2
public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implements TaskService {
@Autowired
private TaskInputService taskInputService;
@Autowired
private TaskMapper taskMapper;
@Autowired
private TaskJoinService taskJoinService;
@Autowired
private TaskOutputService taskOutputService;
@Autowired
private DataValueClient dataValueClient;
@Autowired
private NodeRuleService nodeRuleService;
// @Autowired
// private RuleFeign ruleFeign;
@Override
public List<TaskInfo> selectList(TaskInfoReq taskInfoReq) {
LambdaQueryWrapper<TaskInfo> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.like(
StringUtils.isNotEmpty(taskInfoReq.getName()),
TaskInfo::getName, taskInfoReq.getName()
);
if (taskInfoReq.getStatus() != null && taskInfoReq.getStatus() != 0) {
queryWrapper.eq(
TaskInfo::getStatus, taskInfoReq.getStatus()
);
}
if (taskInfoReq.getWeigh() != null && taskInfoReq.getWeigh() != 0) {
queryWrapper.eq(
TaskInfo::getWeigh, taskInfoReq.getWeigh()
);
}
return this.list(queryWrapper);
}
@Override
public String addTask(TaskInfo taskInfo) {
boolean save = this.save(taskInfo);
if (save == false) {
throw new RuntimeException("err");
}
return "success";
}
@Override
public String updById(TaskInfo taskInfo) {
this.updateById(taskInfo);
return "success";
}
@Override
public String deleteById(Integer id) {
TaskMapper taskMapper = this.baseMapper;
int i = taskMapper.deleteById(id);
if (i <= 0) {
throw new RuntimeException("err");
}
return "success";
}
@Override
public String executeTask(Integer taskId) {
LambdaQueryWrapper<TaskInputInfo> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TaskInputInfo::getTaskId, taskId);
List<TaskInputInfo> taskInputList = taskInputService.list(queryWrapper);
if (taskInputList.isEmpty()) {
return "表节点没有进行相对应的选择,无法继续执行";
}
LambdaQueryWrapper<TaskOutputInfo> outputInfo = new LambdaQueryWrapper<>();
outputInfo.eq(TaskOutputInfo::getTaskId, taskId);
TaskOutputInfo taskOutputInfo = taskOutputService.getOne(outputInfo);
String[] outPutFileName = taskOutputInfo.getLastFileName().split(",");
String[] newFileName = taskOutputInfo.getNewFileName().split(",");
Integer two = newFileName.length;
HashMap<String, String> fieldAsNameMap = new HashMap<>();
HashMap<String, String> newAndOldMap = new HashMap<>();
HashSet<String> basicList = new HashSet<>();
String sql = "";
Long databaseId = 0L;
for (int i = 0; i < taskInputList.size(); i++) {
basicList.add(taskInputList.get(i).getDatabaseId());
databaseId = Long.parseLong(taskInputList.get(i).getDatabaseId());
if (basicList.size() > 1) {
throw new RuntimeException("数据库选择的不同,无法执行");
}
String[] tableFieldList = taskInputList.get(i).getTableField().split(",");
String[] tableAsFieldList = taskInputList.get(i).getTableAsField().split(",");
for (int j = 0; j < tableAsFieldList.length; j++) {
fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]);
fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]);
for (int o = 0; o < newFileName.length; o++) {
newAndOldMap.put(fieldAsNameMap.get(outPutFileName[o]), newFileName[o]);
if (tableAsFieldList[j].equals(outPutFileName[o])) {
sql += "," + taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] + " " +
tableAsFieldList[j];
}
}
}
}
sql = sql.substring(1);
sql = "select " + sql + " from ";
String leftJoin = "";
LambdaQueryWrapper<TaskJoinInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId);
List<TaskJoinInfo> taskJoinInfos = taskJoinService.list(lambdaQueryWrapper);
if (!taskJoinInfos.isEmpty()) {
for (TaskJoinInfo taskJoinInfo : taskJoinInfos) {
//拿到左表id
String leftId = taskJoinInfo.getLeftId();
TaskInputInfo leftInput = taskInputService.findByNodeId(leftId);
//拿到右表id
String rightId = taskJoinInfo.getRightId();
TaskInputInfo rightInput = taskInputService.findByNodeId(rightId);
leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName() + " " +
taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " +
rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getRightJoinField()) + " ";
}
} else {
LambdaQueryWrapper<TaskInputInfo> selectOne = new LambdaQueryWrapper<>();
selectOne.eq(TaskInputInfo::getTaskId, taskId);
TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne);
leftJoin += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName();
}
LambdaQueryWrapper<TaskInfo> taskInfoQueryWrapper = new LambdaQueryWrapper<>();
taskInfoQueryWrapper.eq(TaskInfo::getId, taskId);
TaskInfo taskInfo = taskMapper.selectById(taskId);
Long basicId = Long.valueOf(taskOutputInfo.getBasicId());
Long tableId = Long.valueOf(taskOutputInfo.getTableId());
String sqlCount = "select count(1) from " + leftJoin;
//查询出总条数
long l = System.currentTimeMillis();
log.info("开始查询!!!!!!!!!!!!!!!");
Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount);
log.info("查询到的条数为{}", count);
int pageSize = 10000;
int totalSegments = (int) Math.ceil((double) count / pageSize);
log.info("总共页码为{}", totalSegments);
//查询数据
sql = sql + leftJoin;
for (int i = 0; i < totalSegments; i++) {
log.info("当前页为{}", i);
int pageNum = i * pageSize;
long finalFirstArray = Math.min(pageSize, count - pageNum);
String limit = " limit " + pageNum + "," + pageSize;
String limitSelect = sql + limit;
Long finalDatabaseId = databaseId;
log.info("执行查询语句为{}", limitSelect);
if (taskInfo.getWeigh() == 4) {
log.info("执行紧急任务");
PriorityThreadPool.activeEmergencyTasks.set(0);
PriorityThreadPool.remainingTasks.set(0);
executeUrgently(() -> {
try {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
} finally {
if (PriorityThreadPool.remainingTasks.decrementAndGet() == 0) {
System.out.println("All emergency tasks have completed.");
}
}
});
}
if (taskInfo.getWeigh() == 3) {
log.info("执行高级任务");
executeHigh(() -> {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
});
}
if (taskInfo.getWeigh() == 2) {
log.info("执行中级任务");
executeMedium(() -> {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
});
}
if (taskInfo.getWeigh() == 1) {
log.info("执行低级任务");
executeLow(() -> {
selectAndAdd(finalDatabaseId, basicId, limitSelect, tableId, newAndOldMap, finalFirstArray, two);
});
}
}
// Result<List<List<DataValue>>> tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql);
return "success";
}
// @Resource
// private Mysql mySqlDataSource;
@NotNull
private void selectAndAdd(Long databaseId, Long basicId, String sql, Long tableId,
HashMap<String, String> newAndOldMap, Long finalFirstArray, Integer two) {
log.info("开始查询数据");
log.info("sql{}", sql);
log.info("数组为{}{}", finalFirstArray, two);
log.info("开始远调");
// mySqlDataSource.setQuery(MySqlQuery.builder().dataSourceId(databaseId).sql(sql).one(finalFirstArray).two(two).build());
// com.muyu.access.data.base.DataValue[][] rows = mySqlDataSource.getRows();
Result<DataValue[][]> result = dataValueClient.findTableValueToArray(databaseId, sql, finalFirstArray, two);
log.info("调用完毕拿到值");
DataValue[][] dataValues = result.getData();
System.out.println("aaa" + dataValues.length);
System.out.println("aaa" + dataValues.length);
System.out.println("aaa" + dataValues.length);
log.info("远程调用完毕,调用数量{},adsad{}", sql, dataValues.length);
for (DataValue[] dataValue : dataValues) {
for (DataValue dataValue1 : dataValue) {
String key = dataValue1.getKey();
String newKey = newAndOldMap.get(key);
dataValue1.setKey(newKey);
}
}
Result results = dataValueClient.addProduct(basicId, tableId, dataValues);
log.info("添加结果为{}", results);
log.info("添加到queue里成功");
}
private void executeTheRule(DataValue[][] dataValues, HashMap<String, String> map, Long newBasicId,
Long tableId, Long taskId) {
// 创建一个单线程的ExecutorService
ExecutorService executor = Executors.newSingleThreadExecutor();
// 创建一个链表来保存任务
LinkedList<Callable<DataValue[][]>> tasks = new LinkedList<>();
tasks.add(() -> {
return dataValues;
});
LambdaQueryWrapper<NodeRule> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(NodeRule::getTaskId, taskId);
List<NodeRule> nodeRuleList = nodeRuleService.list(wrapper);
Future<DataValue[][]> futureData = executor.submit(tasks.poll());
if (!CollectionUtils.isEmpty(nodeRuleList)) {
for (NodeRule nodeRule : nodeRuleList) {
// Result<EngineVersion> versionResult = ruleFeign.getVersion(nodeRule.getNodeId());
// EngineVersion version = versionResult.getData();
// final Future<DataValue[][]> finalCurrentFuture = futureData;
// log.info(finalCurrentFuture);
// Callable<DataValue[][]> task = () -> {
// DataValue[][] prevResult = finalCurrentFuture.get();
// //log.info(data.getClassName());
// if (version.getId() == 1) {
// for (DataValue[] values : prevResult) {
// for (DataValue value : values) {
// try {
//
// } catch (Exception e) {
// log.error(e.getMessage());
// }
// }
// }
// }
// return prevResult;
// };
}
}
}
}
// private void selectAndAdd(Integer count,Long databaseId,Long basicId,String sql,Long tableId,
// HashMap<String,String> newAndOldMap,Integer two) {
// int pageSize = 700000;
// long firstArray = 0L;
// int totalSegments = (int) Math.ceil((double) count / pageSize);
// log.info("总共页码为{}", totalSegments);
// for (int i = 0; i < totalSegments; i++) {
// log.info("当前页为{}", i);
// String limit = " limit " + i * pageSize + "," + pageSize;
// firstArray = Math.min(pageSize, count - i * pageSize);
// if (firstArray >= pageSize) {
// firstArray = pageSize;
// }
// String limitSelect = sql + limit;
// log.info("执行查询语句为{}", limitSelect);
// log.info(databaseId);
// DataValue[][] tableValue = dataValueClient.findTableValueToArray(databaseId, limitSelect, firstArray,two);
// log.info("远程调用完毕,调用数量{}",tableValue.length);
// for (DataValue[] dataValues : tableValue) {
// for (DataValue dataValue : dataValues) {
// String key = dataValue.getKey();
// String newKey = newAndOldMap.get(key);
// dataValue.setKey(newKey);
// }
// }
// Result result = dataValueClient.addProduct(basicId, tableId, tableValue);
// log.info("添加结果为{}", result);
// log.info("添加到queue里成功");
// }
// static class Worker implements Runnable {
// private final PriorityBlockingQueue<SegmentTask> queue;
//
// public Worker(PriorityBlockingQueue<SegmentTask> queue) {
// this.queue = queue;
// }
//
// @Override
// public void run() {
// while (!Thread.currentThread().isInterrupted()) {
// try {
// // 从队列中取出一个任务
// SegmentTask task = queue.take();
// // 执行任务
// task.run();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt(); // 设置中断状态
// break;
// }
// }
// }
//
// }
// static class SegmentTask implements Runnable, Comparable<SegmentTask> {
//
//
// @Resource
// private DataValueClient dataValueClient;
//
// private final String sql;
// private final Long basicId;
// private final Long tableId;
// private final Weight weight;
// HashMap<String, String> map = new HashMap<>();
//
// public SegmentTask(String sql, Long basicId, Long tableId, HashMap<String, String> map, Weight weight) {
// this.sql = sql;
// this.basicId = basicId;
// this.tableId = tableId;
// this.map = map;
// this.weight = weight;
// }
//
// @Override
// public void run() {
//
// log.info("开始执行任务:" + basicId + " " + sql + " " + weight + " " + tableId, tableId);
// // 查询数据
// Result tableValue = dataValueClient.findTableValue(basicId, sql);
// log.info("远程调用完毕,调用的值为{}", tableValue.getData());
// List<List<DataValue>> data = (List<List<DataValue>>) tableValue.getData();
// log.info("查询结果:{}", data);
// for (List<DataValue> datum : data) {
// for (DataValue dataValue : datum) {
// String key = dataValue.getKey();
// String newKey = map.get(key);
// dataValue.setKey(newKey);
// }
// }
// log.info("开始添加:{}", data);
//
// Result result = dataValueClient.addTableDataValue(basicId, tableId, data);
// log.info("添加完毕字段:{}", result);
// // 处理结果
//// dataValueList.accept(tableValue.getData());
//
//
// }
//
// public Weight geWeight() {
// return weight;
// }
//
//
// @Override
// public int compareTo(@NotNull SegmentTask o) {
// return Integer.compare(o.geWeight().getWeight(), this.weight.getWeight());
// }
// }
// @Autowired
// private NodeRuleService nodeRuleService;
//
// @Autowired
// private RuleFeign ruleFeign;
// public static Map<String, BasicEngine<DataValue>> engineMap = new ConcurrentHashMap<>();
// public static Map<String, BasicEngine<DataValue[]>> engineRowMap = new ConcurrentHashMap<>();
// public static Map<String, BasicEngine<DataValue[][]>> engineDataSetMap = new ConcurrentHashMap<>();