cloud-etl-task/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java

354 lines
13 KiB
Java

package com.muyu.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.domain.*;
import com.muyu.domain.req.TaskInfoReq;
import com.muyu.mapper.TaskMapper;
import com.muyu.service.TaskInputService;
import com.muyu.service.TaskJoinService;
import com.muyu.service.TaskOutputService;
import com.muyu.service.TaskService;
import com.muyu.task.feign.DataValueClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import static com.muyu.task.PriorityThreadPool.*;
/**
* @PackageName:com.muyu.service.impl
* @ClassName:TaskServiceImpl
* @Description:
* @author: ¥陈思豪¥
* @date: 2024/8/22 17:15
*/
@Service
@Log4j2
public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implements TaskService {
@Autowired
private TaskInputService taskInputService;
@Autowired
private TaskMapper taskMapper;
@Autowired
private TaskJoinService taskJoinService;
@Autowired
private TaskOutputService taskOutputService;
@Autowired
private DataValueClient dataValueClient;
@Override
public List<TaskInfo> selectList(TaskInfoReq taskInfoReq) {
LambdaQueryWrapper<TaskInfo> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.like(
StringUtils.isNotEmpty(taskInfoReq.getName()),
TaskInfo::getName, taskInfoReq.getName()
);
if(taskInfoReq.getStatus()!=null && taskInfoReq.getStatus()!=0){
queryWrapper.eq(
TaskInfo::getStatus,taskInfoReq.getStatus()
);
}
if(taskInfoReq.getWeigh()!=null && taskInfoReq.getWeigh()!=0 ){
queryWrapper.eq(
TaskInfo::getWeigh,taskInfoReq.getWeigh()
);
}
return this.list(queryWrapper);
}
@Override
public String addTask(TaskInfo taskInfo) {
boolean save = this.save(taskInfo);
if(save == false){
throw new RuntimeException("err");
}
return "success";
}
@Override
public String updById(TaskInfo taskInfo) {
this.updateById(taskInfo);
return "success";
}
@Override
public String deleteById(Integer id) {
TaskMapper taskMapper = this.baseMapper;
int i = taskMapper.deleteById(id);
if(i<=0){
throw new RuntimeException("err");
}
return "success";
}
@Override
public String executeTask(Integer taskId) {
LambdaQueryWrapper<TaskInputInfo> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TaskInputInfo::getTaskId, taskId);
List<TaskInputInfo> taskInputList = taskInputService.list(queryWrapper);
if(taskInputList.isEmpty()){
return "表节点没有进行相对应的选择,无法继续执行";
}
LambdaQueryWrapper<TaskOutputInfo> outputInfo = new LambdaQueryWrapper<>();
outputInfo.eq(TaskOutputInfo::getTaskId, taskId);
TaskOutputInfo taskOutputInfo = taskOutputService.getOne(outputInfo);
String[] outPutFileName = taskOutputInfo.getLastFileName().split(",");
String[] newFileName = taskOutputInfo.getNewFileName().split(",");
HashMap<String, String> fieldAsNameMap = new HashMap<>();
HashMap<String, String> newAndOldMap = new HashMap<>();
HashSet<String> basicList = new HashSet<>();
String sql = "";
Long databaseId = 0L;
for (int i = 0; i < taskInputList.size(); i++) {
basicList.add(taskInputList.get(i).getDatabaseId());
databaseId = Long.parseLong(taskInputList.get(i).getDatabaseId());
if(basicList.size()>1){
throw new RuntimeException("数据库选择的不同,无法执行");
}
String[] tableFieldList = taskInputList.get(i).getTableField().split(",");
String[] tableAsFieldList = taskInputList.get(i).getTableAsField().split(",");
for (int j = 0; j < tableAsFieldList.length; j++) {
fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]);
fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]);
for (int o = 0; o < newFileName.length; o++) {
newAndOldMap.put(fieldAsNameMap.get(outPutFileName[o]), newFileName[o]);
if(tableAsFieldList[j].equals(outPutFileName[o])){
sql += ","+taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] +" " +
tableAsFieldList[j] + " ";
}
}
}
}
System.out.println(sql);
System.out.println(sql);
System.out.println(sql);
System.out.println(sql);
System.out.println(sql);
sql = sql.substring(1);
sql = "select " + sql + " from ";
String leftJoin = "";
LambdaQueryWrapper<TaskJoinInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId);
List<TaskJoinInfo> taskJoinInfos = taskJoinService.list(lambdaQueryWrapper);
if(!taskJoinInfos.isEmpty()){
for (TaskJoinInfo taskJoinInfo : taskJoinInfos) {
//拿到左表id
String leftId = taskJoinInfo.getLeftId();
TaskInputInfo leftInput = taskInputService.findByNodeId(leftId);
//拿到右表id
String rightId = taskJoinInfo.getRightId();
TaskInputInfo rightInput = taskInputService.findByNodeId(rightId);
leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " +
taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " +
rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getRightJoinField()) + " ";
}
}else{
LambdaQueryWrapper<TaskInputInfo> selectOne = new LambdaQueryWrapper<>();
queryWrapper.eq(TaskInputInfo::getTaskId, taskId);
TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne);
leftJoin += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName();
}
String sqlCount = "select count(1) from "+leftJoin;
//查询出总条数
Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount);
log.info("查询到的条数为{}",count);
//查询数据
sql = sql + leftJoin;
LambdaQueryWrapper<TaskInfo> taskInfoQueryWrapper = new LambdaQueryWrapper<>();
taskInfoQueryWrapper.eq(TaskInfo::getId, taskId);
TaskInfo taskInfo = taskMapper.selectById(taskId);
Long basicId = Long.valueOf(taskOutputInfo.getBasicId());
Long tableId = Long.valueOf(taskOutputInfo.getTableId());
if(taskInfo.getWeigh() == 4){
log.info("执行紧急任务");
Long finalDatabaseId = databaseId;
String finalSql = sql;
log.info("sql为{}",finalSql);
executeUrgently(() -> {
selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap);
});
}
if(taskInfo.getWeigh() == 3){
log.info("执行高级任务");
Long finalDatabaseId = databaseId;
String finalSql = sql;
executeHigh(() -> {
selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap);
});
}
if(taskInfo.getWeigh() == 2){
log.info("执行中级任务");
Long finalDatabaseId = databaseId;
String finalSql = sql;
executeMedium(() -> {
selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap);
});
}
if(taskInfo.getWeigh() == 1){
log.info("执行低级任务");
Long finalDatabaseId = databaseId;
String finalSql = sql;
executeLow(() -> {
selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap);
});
}
// Result<List<List<DataValue>>> tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql);
return "success";
}
private void selectAndAdd(Integer count,Long databaseId,Long basicId,String sql,Long tableId,
HashMap<String,String> newAndOldMap) {
int pageSize = 1000;
long firstArray = 0L;
int totalSegments = (int) Math.ceil((double) count / pageSize);
for (int i = 0; i < totalSegments; i++) {
log.info("调用第{}次", i);
String limit = " limit " + i * pageSize + "," + pageSize;
firstArray = count - (long) i * pageSize;
if (firstArray >= pageSize) {
firstArray = pageSize;
}
String limitSelect = sql + limit;
log.info("执行查询语句为{}", limitSelect);
log.info(databaseId);
Result<List<List<DataValue>>> tableValue = dataValueClient.findTableValue(databaseId, limitSelect);
List<List<DataValue>> data = tableValue.getData();
log.info("远程调用完毕,调用数量{}",data.size());
for (List<DataValue> datum : data) {
log.info(datum.toString());
for (DataValue dataValue : datum) {
log.info(dataValue.toString());
String key = dataValue.getKey();
log.info("传来的值的键为{}",key);
String newKey = newAndOldMap.get(key);
log.info("取出来的值的键为{}",newKey);
log.info("新的目标字段{}",newKey);
log.info("通过map字段取到的值为{}", newKey);
dataValue.setKey(newKey);
}
}
Result result = dataValueClient.addTableDataValue(basicId, tableId, data);
log.info("添加结果为{}", result);
log.info("添加到queue里成功");
}
// static class Worker implements Runnable {
// private final PriorityBlockingQueue<SegmentTask> queue;
//
// public Worker(PriorityBlockingQueue<SegmentTask> queue) {
// this.queue = queue;
// }
//
// @Override
// public void run() {
// while (!Thread.currentThread().isInterrupted()) {
// try {
// // 从队列中取出一个任务
// SegmentTask task = queue.take();
// // 执行任务
// task.run();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt(); // 设置中断状态
// break;
// }
// }
// }
//
// }
// static class SegmentTask implements Runnable, Comparable<SegmentTask> {
//
//
// @Resource
// private DataValueClient dataValueClient;
//
// private final String sql;
// private final Long basicId;
// private final Long tableId;
// private final Weight weight;
// HashMap<String, String> map = new HashMap<>();
//
// public SegmentTask(String sql, Long basicId, Long tableId, HashMap<String, String> map, Weight weight) {
// this.sql = sql;
// this.basicId = basicId;
// this.tableId = tableId;
// this.map = map;
// this.weight = weight;
// }
//
// @Override
// public void run() {
//
// log.info("开始执行任务:" + basicId + " " + sql + " " + weight + " " + tableId, tableId);
// // 查询数据
// Result tableValue = dataValueClient.findTableValue(basicId, sql);
// log.info("远程调用完毕,调用的值为{}", tableValue.getData());
// List<List<DataValue>> data = (List<List<DataValue>>) tableValue.getData();
// log.info("查询结果:{}", data);
// for (List<DataValue> datum : data) {
// for (DataValue dataValue : datum) {
// String key = dataValue.getKey();
// String newKey = map.get(key);
// dataValue.setKey(newKey);
// }
// }
// log.info("开始添加:{}", data);
//
// Result result = dataValueClient.addTableDataValue(basicId, tableId, data);
// log.info("添加完毕字段:{}", result);
// // 处理结果
//// dataValueList.accept(tableValue.getData());
//
//
// }
//
// public Weight geWeight() {
// return weight;
// }
//
//
// @Override
// public int compareTo(@NotNull SegmentTask o) {
// return Integer.compare(o.geWeight().getWeight(), this.weight.getWeight());
// }
// }
}
}