优化调整任务执行逻辑

master
面包骑士 2024-09-05 21:15:12 +08:00
parent 80ffd4e077
commit 89aeb4444d
2 changed files with 217 additions and 53 deletions

View File

@ -9,13 +9,153 @@ package com.muyu.quest.manager;
*/
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* @Author:
* @Name: TaskManager
* @Description:
* @Description: 线
* @CreatedDate: 2024/9/4 7:44
* @FilePath: com.muyu.quest.manager
*/
public class TaskManager {
public final class TaskManager {
// 线程池中默认线程的个数为5
private static int workerNum = 10;
// 工作线程
private final WorkThread[] workThrads;
// 未处理的任务
private static volatile int finishedTask = 0;
// 任务队列,作为一个缓冲,List线程不安全
private final List<Runnable> taskQueue = new LinkedList<Runnable>();
private static TaskManager taskManager;
// 创建具有默认线程个数的线程池
private TaskManager() {
this(5);
}
// 创建线程池,workerNum为线程池中工作线程的个数
private TaskManager(int workerNum) {
taskManager.workerNum = workerNum;
workThrads = new WorkThread[workerNum];
for (int i = 0; i < workerNum; i++) {
workThrads[i] = new WorkThread();
workThrads[i].start();// 开启线程池中的线程
}
}
// 单态模式,获得一个默认线程个数的线程池
public static TaskManager getTaskManager() {
return getTaskManager(TaskManager.workerNum);
}
// 单态模式,获得一个指定线程个数的线程池,workerNum(>0)为线程池中工作线程的个数
// workerNum<=0创建默认的工作线程个数
public static TaskManager getTaskManager(int workerNum1) {
if (workerNum1 <= 0) {
workerNum1 = TaskManager.workerNum;
}
if (taskManager == null) {
taskManager = new TaskManager(workerNum1);
}
return taskManager;
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定
public void execute(List<Runnable> task) {
execute(task.toArray(new Runnable[0]));
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定
public void execute(Runnable... task) {
synchronized (taskQueue) {
Collections.addAll(taskQueue, task);
taskQueue.notify();
}
}
// 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
public void destroy() {
while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 工作线程停止工作且置为null
for (int i = 0; i < workerNum; i++) {
workThrads[i].stopWorker();
workThrads[i] = null;
}
taskManager=null;
taskQueue.clear();// 清空任务队列
}
// 返回工作线程的个数
public int getWorkThreadNumber() {
return workerNum;
}
// 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
public int getFinishedTasknumber() {
return finishedTask;
}
// 返回任务队列的长度,即还没处理的任务个数
public int getWaitTasknumber() {
return taskQueue.size();
}
// 覆盖toString方法返回线程池信息工作线程个数和已完成任务个数
@Override
public String toString() {
return "工作任务数:" + workerNum + ",已完成任务数:"
+ finishedTask + ",等待任务数:" + getWaitTasknumber();
}
/**
* 线
*/
private class WorkThread extends Thread {
// 该工作线程是否有效,用于结束该工作线程
private boolean isRunning = true;
/*
*
*/
@Override
public void run() {
Runnable r = null;
while (isRunning) {// 注意若线程无效则自然结束run方法该线程就没用了
synchronized (taskQueue) {
while (isRunning && taskQueue.isEmpty()) {// 队列为空
try {
taskQueue.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!taskQueue.isEmpty()) {
r = taskQueue.remove(0);// 取出任务
}
}
if (r != null) {
r.run();// 执行任务
}
finishedTask++;
r = null;
}
}
// 停止工作让该线程自然执行完run方法自然结束
public void stopWorker() {
isRunning = false;
}
}
}

View File

@ -25,6 +25,7 @@ import com.muyu.quest.mapper.TaskMapper;
import com.muyu.quest.domain.Task;
import com.muyu.quest.service.TaskService;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
@ -141,61 +142,17 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
@Override
public String execute(String taskCode) {
log.info("任务编码 {} 开始执行......",taskCode);
// 查询SQL语句
String findSql = "";
// 新增SQL语句
String addSql = "";
// 查询任务所有节点
List<Node> nodeListAll = nodeService.selectNodeList(new NodeReq().buildTaskCode(taskCode));
// 节点初始化
HashMap<String, List<Node>> nodeMap = NodeUtils.nodeInit(nodeListAll);
HashMap<String, List<Node>> nodeMap = nodeCheckNorm(nodeListAll);
// 获取查询SQL
String findSql = getFindSql(nodeMap);
// 获取新增SQL
String addSql = getAddSql(nodeMap,findSql);
// 执行新增SQL
implAddSql(addSql);
List<NodeType> nodeTypeList = selectNodeTypeList();
/* 节点组成校验 */
NodeUtils.nodeCheckMakeUp(nodeListAll, nodeTypeList);
/* 节点连接规范校验 */
NodeUtils.nodeCheckNorm(nodeListAll, nodeTypeList);
// 开始节点处理
Node thisNode = nodeMap.get("start").get(0);
// 开始节点处理
while (true){
thisNode = NodeUtils.nodeCheckNorm(thisNode, nodeListAll);
if (StringUtils.equals(thisNode.getNodeType(), "end")){
break;
}
// 查询当前节点所有配置信息
List<NodeDisposition> dispList = dispositionService
.selectNodeDispositionList(new NodeDisposition().buildNodeCode(thisNode.getNodeCode()));
if (dispList.isEmpty()){
throw new TaskException("节点 "+thisNode+" 配置为空");
}
// 根据情况拼接查询sql与新增sql
if(StringUtils.equals(thisNode.getNodeType(), "table")) {
// 如果表节点下一级为数据输出节点,即表示为单表查询,将 当前表结构处理为查询语句
if (StringUtils.equals(NodeUtils.nodeCheckNorm(thisNode, nodeListAll).getNodeType(), "exportation")){
findSql = NodeUtils.tableNode(dispList);
}
}else if (StringUtils.equals(thisNode.getNodeType(), "unite")){
findSql = NodeUtils.nodeDispUnite(dispList);
}else if (StringUtils.equals(thisNode.getNodeType(), "exportation")){
// 执行查询语句
log.info("任务执行查询阶段结束,查询sql为: [{}]", findSql);
Result<List<DataModel>> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, findSql));
if (tableValue.getCode() != 200){
throw new TaskException(tableValue.getMsg());
}
List<DataModel> data = tableValue.getData();
addSql = NodeUtils.nodeDispExportation(dispList, data);
}
}
log.info("任务执行完成,新增sql为: [{}]", addSql);
Result resp = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
if (resp.getCode() != 200){
throw new TaskException(resp.getMsg());
}
return "执行成功";
}
@ -212,5 +169,72 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
return "测试成功,无异常";
}
/**
* : map
*/
private HashMap<String, List<Node>> nodeCheckNorm(List<Node> nodeListAll) {
// 查询节点类型与其校验相关配置
List<NodeType> nodeTypeList = selectNodeTypeList();
/* 节点组成校验 */
NodeUtils.nodeCheckMakeUp(nodeListAll, nodeTypeList);
/* 节点连接规范校验 */
NodeUtils.nodeCheckNorm(nodeListAll, nodeTypeList);
return NodeUtils.nodeInit(nodeListAll);
}
/**
* : SQL
*/
private String getFindSql(HashMap<String, List<Node>> nodeMap) {
String findSql = "";
List<Node> uniteNodes = nodeMap.get("unite");
if (uniteNodes.isEmpty()){
Node tableNode = nodeMap.get("table").get(0);
List<NodeDisposition> dispList = getNodeDisp(tableNode);
findSql = NodeUtils.tableNode(dispList);
}else{
Node uniteNode = uniteNodes.get(0);
List<NodeDisposition> dispList = getNodeDisp(uniteNode);
findSql = NodeUtils.nodeDispUnite(dispList);
}
return findSql;
}
/**
* SQL
*/
private String getAddSql(HashMap<String, List<Node>> nodeMap, String findSql) {
Result<List<DataModel>> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, findSql));
if (tableValue.getCode() != 200){
throw new TaskException(tableValue.getMsg());
}
List<DataModel> data = tableValue.getData();
List<NodeDisposition> dispList = getNodeDisp(nodeMap.get("exportation").get(0));
return NodeUtils.nodeDispExportation(dispList, data);
}
/**
* SQL
*/
private void implAddSql(String addSql) {
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
if (addResult.getCode() != 200){
throw new TaskException(addResult.getMsg());
}
}
// 查询节点配置信息
public List<NodeDisposition> getNodeDisp(Node node){
List<NodeDisposition> dispList = dispositionService
.selectNodeDispositionList(new NodeDisposition().buildNodeCode(node.getNodeCode()));
if (dispList.isEmpty()){
throw new TaskException("节点 "+node+" 配置为空");
}
return dispList;
}
}