master
面包骑士 2024-09-06 21:11:19 +08:00
parent d5f3e34a8c
commit d065e32f65
5 changed files with 23 additions and 18 deletions

View File

@ -41,7 +41,7 @@ public class TaskExport implements Serializable {
/** 执行SQL */ /** 执行SQL */
@Excel(name = "执行SQL") @Excel(name = "执行SQL")
private String addSql; private Object addSql;
/** 执行状态 */ /** 执行状态 */
@Excel(name = "执行状态") @Excel(name = "执行状态")
@ -49,7 +49,7 @@ public class TaskExport implements Serializable {
/** 失败原因 */ /** 失败原因 */
@Excel(name = "失败原因") @Excel(name = "失败原因")
private String error; private Object error;
public TaskExport(String taskCode,String exportCode, String addSql, Integer start, String error) { public TaskExport(String taskCode,String exportCode, String addSql, Integer start, String error) {
this.taskCode = taskCode; this.taskCode = taskCode;

View File

@ -107,4 +107,16 @@ public class TaskExportController extends BaseController
taskExportService.removeBatchByIds(Arrays.asList(ids)); taskExportService.removeBatchByIds(Arrays.asList(ids));
return success(); return success();
} }
@PostMapping("/selTaskStart")
public Result selTaskStart(String taskCode){
TaskExport taskExport = new TaskExport();
taskExport.setTaskCode(taskCode);
List<TaskExport> taskExports = taskExportService.selectTaskExportList(taskExport);
int num = taskExports.size();
int yNum = taskExports.stream().filter(taskExport1 -> taskExport1.getStart() == 1).toArray().length;
int nNum = num - yNum;
return success("共执行线程"+num+"个,完成"+yNum+"个,未完成"+nNum+"个");
}
} }

View File

@ -52,12 +52,7 @@ public class TaskExportServiceImpl
if (StringUtils.isNotEmpty(taskExport.getTaskCode())){ if (StringUtils.isNotEmpty(taskExport.getTaskCode())){
queryWrapper.eq(TaskExport::getTaskCode, taskExport.getTaskCode()); queryWrapper.eq(TaskExport::getTaskCode, taskExport.getTaskCode());
} }
if (StringUtils.isNotEmpty(taskExport.getAddSql())){
queryWrapper.eq(TaskExport::getAddSql, taskExport.getAddSql());
}
if (StringUtils.isNotEmpty(taskExport.getError())){
queryWrapper.eq(TaskExport::getError, taskExport.getError());
}
return this.list(queryWrapper); return this.list(queryWrapper);
} }
@ -79,15 +74,9 @@ public class TaskExportServiceImpl
if (StringUtils.isNotEmpty(taskExport.getTaskCode())){ if (StringUtils.isNotEmpty(taskExport.getTaskCode())){
queryWrapper.eq(TaskExport::getTaskCode, taskExport.getTaskCode()); queryWrapper.eq(TaskExport::getTaskCode, taskExport.getTaskCode());
} }
if (StringUtils.isNotEmpty(taskExport.getAddSql())){
queryWrapper.eq(TaskExport::getAddSql, taskExport.getAddSql());
}
if (taskExport.getStart() != null){ if (taskExport.getStart() != null){
queryWrapper.eq(TaskExport::getStart, taskExport.getStart()); queryWrapper.eq(TaskExport::getStart, taskExport.getStart());
} }
if (StringUtils.isNotEmpty(taskExport.getError())){
queryWrapper.eq(TaskExport::getError, taskExport.getError());
}
queryWrapper.last("limit 1"); queryWrapper.last("limit 1");
return this.getOne(queryWrapper); return this.getOne(queryWrapper);
} }

View File

@ -154,7 +154,8 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
// 获取查询条数 // 获取查询条数
int count = getFindCount(findSql, nodeMap); int count = getFindCount(findSql, nodeMap);
// 划分线程 每次查询1000条 // 划分线程 每次查询1000条
int threadNum = count / 1000 + 1; int pageSize = 10000;
int threadNum = count / pageSize + 1;
log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum); log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum);
for (int i = 0; i < threadNum; i++) { for (int i = 0; i < threadNum; i++) {
int index = i+1; int index = i+1;
@ -162,14 +163,16 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
taskManager.execute(() -> { taskManager.execute(() -> {
String exportCode = UUID.randomUUID().toString().replace("-",""); String exportCode = UUID.randomUUID().toString().replace("-","");
// 获取新SQL 并执行 // 获取新SQL 并执行
String sql = findSql + " LIMIT 1000 OFFSET "+(index-1)*1000; String sql = findSql + " LIMIT 1000 OFFSET "+(index-1)*pageSize;
String addSql = getAddSql(nodeMap, sql); String addSql = getAddSql(nodeMap, sql);
TaskExport entity = new TaskExport(taskCode,exportCode, sql, 0, ""); int addSqlMaxLength = Math.min(addSql.length(), 30000);
TaskExport entity = new TaskExport(taskCode,exportCode, addSql.substring(0,addSqlMaxLength), 0, "");
taskExportService.save(entity); taskExportService.save(entity);
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql)); Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
log.info("任务 {} 第 {} 线程执行结果 {}",taskCode,index,addResult); log.info("任务 {} 第 {} 线程执行结果 {}",taskCode,index,addResult);
if (addResult.getCode() != 200){ if (addResult.getCode() != 200){
entity.setError(addResult.getMsg()); int errorMaxLength = Math.min(addResult.getMsg().length(), 30000);
entity.setError(addResult.getMsg().substring(0,errorMaxLength));
entity.setStart(2); entity.setStart(2);
}else { }else {
entity.setStart(1); entity.setStart(1);

View File

@ -307,6 +307,7 @@ public class NodeUtils {
datum.forEach(dataModel -> { datum.forEach(dataModel -> {
// 检查 getValue 是否为空 // 检查 getValue 是否为空
String value = dataModel.getValue() != null ? dataModel.getValue().toString() : null; String value = dataModel.getValue() != null ? dataModel.getValue().toString() : null;
// 规则校验
dataMap.put(dataModel.getKey(), value); dataMap.put(dataModel.getKey(), value);
}); });
dataList1.add(dataMap); dataList1.add(dataMap);