From 89aeb4444d9b4c1e836b7df0ea9efcf71191c324 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=91=E5=B9=B4=E6=A2=A6=E4=B8=8E=E7=A0=96?= <2847127106@qq.com> Date: Thu, 5 Sep 2024 21:15:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B0=83=E6=95=B4=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=89=A7=E8=A1=8C=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/muyu/quest/manager/TaskManager.java | 144 +++++++++++++++++- .../quest/service/impl/TaskServiceImpl.java | 126 ++++++++------- 2 files changed, 217 insertions(+), 53 deletions(-) diff --git a/muyu-quest-server/src/main/java/com/muyu/quest/manager/TaskManager.java b/muyu-quest-server/src/main/java/com/muyu/quest/manager/TaskManager.java index 78fbf7e..4f40c3a 100644 --- a/muyu-quest-server/src/main/java/com/muyu/quest/manager/TaskManager.java +++ b/muyu-quest-server/src/main/java/com/muyu/quest/manager/TaskManager.java @@ -9,13 +9,153 @@ package com.muyu.quest.manager; */ +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + /** * @Author: 胡杨 * @Name: TaskManager - * @Description: 任务管理器 + * @Description: 任务线程管理器 * @CreatedDate: 2024/9/4 下午7:44 * @FilePath: com.muyu.quest.manager */ -public class TaskManager { +public final class TaskManager { + // 线程池中默认线程的个数为5 + private static int workerNum = 10; + // 工作线程 + private final WorkThread[] workThrads; + // 未处理的任务 + private static volatile int finishedTask = 0; + // 任务队列,作为一个缓冲,List线程不安全 + private final List taskQueue = new LinkedList(); + private static TaskManager taskManager; + + // 创建具有默认线程个数的线程池 + private TaskManager() { + this(5); + } + + // 创建线程池,workerNum为线程池中工作线程的个数 + private TaskManager(int workerNum) { + taskManager.workerNum = workerNum; + workThrads = new WorkThread[workerNum]; + for (int i = 0; i < workerNum; i++) { + workThrads[i] = new WorkThread(); + workThrads[i].start();// 开启线程池中的线程 + } + } + + // 单态模式,获得一个默认线程个数的线程池 + public static TaskManager getTaskManager() { + return getTaskManager(TaskManager.workerNum); + } + + // 单态模式,获得一个指定线程个数的线程池,workerNum(>0)为线程池中工作线程的个数 + // workerNum<=0创建默认的工作线程个数 + public static TaskManager getTaskManager(int workerNum1) { + if (workerNum1 <= 0) { + workerNum1 = TaskManager.workerNum; + } + if (taskManager == null) { + taskManager = new TaskManager(workerNum1); + } + return taskManager; + } + + // 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定 + public void execute(List task) { + execute(task.toArray(new Runnable[0])); + } + + // 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定 + public void execute(Runnable... task) { + synchronized (taskQueue) { + Collections.addAll(taskQueue, task); + taskQueue.notify(); + } + } + + + + // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁 + public void destroy() { + while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧 + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + // 工作线程停止工作,且置为null + for (int i = 0; i < workerNum; i++) { + workThrads[i].stopWorker(); + workThrads[i] = null; + } + taskManager=null; + taskQueue.clear();// 清空任务队列 + } + + // 返回工作线程的个数 + public int getWorkThreadNumber() { + return workerNum; + } + + // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成 + public int getFinishedTasknumber() { + return finishedTask; + } + + // 返回任务队列的长度,即还没处理的任务个数 + public int getWaitTasknumber() { + return taskQueue.size(); + } + + // 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数 + @Override + public String toString() { + return "工作任务数:" + workerNum + ",已完成任务数:" + + finishedTask + ",等待任务数:" + getWaitTasknumber(); + } + + /** + * 内部类,工作线程 + */ + private class WorkThread extends Thread { + // 该工作线程是否有效,用于结束该工作线程 + private boolean isRunning = true; + + /* + * 关键所在,如果任务队列不空,则取出任务执行,若任务队列空,则等待 + */ + @Override + public void run() { + Runnable r = null; + while (isRunning) {// 注意,若线程无效则自然结束run方法,该线程就没用了 + synchronized (taskQueue) { + while (isRunning && taskQueue.isEmpty()) {// 队列为空 + try { + taskQueue.wait(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (!taskQueue.isEmpty()) { + r = taskQueue.remove(0);// 取出任务 + } + } + if (r != null) { + r.run();// 执行任务 + } + finishedTask++; + r = null; + } + } + + // 停止工作,让该线程自然执行完run方法,自然结束 + public void stopWorker() { + isRunning = false; + } + } } diff --git a/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java b/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java index b2187e3..1308527 100644 --- a/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java +++ b/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java @@ -25,6 +25,7 @@ import com.muyu.quest.mapper.TaskMapper; import com.muyu.quest.domain.Task; import com.muyu.quest.service.TaskService; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import javax.annotation.Resource; @@ -141,61 +142,17 @@ public class TaskServiceImpl extends ServiceImpl @Override public String execute(String taskCode) { log.info("任务编码 {} 开始执行......",taskCode); - // 查询SQL语句 - String findSql = ""; - // 新增SQL语句 - String addSql = ""; - // 查询任务所有节点 List nodeListAll = nodeService.selectNodeList(new NodeReq().buildTaskCode(taskCode)); // 节点初始化 - HashMap> nodeMap = NodeUtils.nodeInit(nodeListAll); + HashMap> nodeMap = nodeCheckNorm(nodeListAll); + // 获取查询SQL + String findSql = getFindSql(nodeMap); + // 获取新增SQL + String addSql = getAddSql(nodeMap,findSql); + // 执行新增SQL + implAddSql(addSql); - List nodeTypeList = selectNodeTypeList(); - /* 节点组成校验 */ - NodeUtils.nodeCheckMakeUp(nodeListAll, nodeTypeList); - /* 节点连接规范校验 */ - NodeUtils.nodeCheckNorm(nodeListAll, nodeTypeList); - - // 开始节点处理 - Node thisNode = nodeMap.get("start").get(0); - // 开始节点处理 - while (true){ - thisNode = NodeUtils.nodeCheckNorm(thisNode, nodeListAll); - if (StringUtils.equals(thisNode.getNodeType(), "end")){ - break; - } - // 查询当前节点所有配置信息 - List dispList = dispositionService - .selectNodeDispositionList(new NodeDisposition().buildNodeCode(thisNode.getNodeCode())); - if (dispList.isEmpty()){ - throw new TaskException("节点 "+thisNode+" 配置为空"); - } - // 根据情况拼接查询sql与新增sql - if(StringUtils.equals(thisNode.getNodeType(), "table")) { - // 如果表节点下一级为数据输出节点,即表示为单表查询,将 当前表结构处理为查询语句 - if (StringUtils.equals(NodeUtils.nodeCheckNorm(thisNode, nodeListAll).getNodeType(), "exportation")){ - findSql = NodeUtils.tableNode(dispList); - } - }else if (StringUtils.equals(thisNode.getNodeType(), "unite")){ - findSql = NodeUtils.nodeDispUnite(dispList); - }else if (StringUtils.equals(thisNode.getNodeType(), "exportation")){ - // 执行查询语句 - log.info("任务执行查询阶段结束,查询sql为: [{}]", findSql); - Result> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, findSql)); - if (tableValue.getCode() != 200){ - throw new TaskException(tableValue.getMsg()); - } - List data = tableValue.getData(); - addSql = NodeUtils.nodeDispExportation(dispList, data); - } - } - - log.info("任务执行完成,新增sql为: [{}]", addSql); - Result resp = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql)); - if (resp.getCode() != 200){ - throw new TaskException(resp.getMsg()); - } return "执行成功"; } @@ -212,5 +169,72 @@ public class TaskServiceImpl extends ServiceImpl return "测试成功,无异常"; } + /** + * 执行任务第一步: 校验节点规范 并返回初始化节点map + */ + private HashMap> nodeCheckNorm(List nodeListAll) { + // 查询节点类型与其校验相关配置 + List nodeTypeList = selectNodeTypeList(); + /* 节点组成校验 */ + NodeUtils.nodeCheckMakeUp(nodeListAll, nodeTypeList); + /* 节点连接规范校验 */ + NodeUtils.nodeCheckNorm(nodeListAll, nodeTypeList); + + return NodeUtils.nodeInit(nodeListAll); + } + + /** + * 执行任务第二步: 拼接查询SQL + */ + private String getFindSql(HashMap> nodeMap) { + String findSql = ""; + List uniteNodes = nodeMap.get("unite"); + if (uniteNodes.isEmpty()){ + Node tableNode = nodeMap.get("table").get(0); + List dispList = getNodeDisp(tableNode); + findSql = NodeUtils.tableNode(dispList); + }else{ + Node uniteNode = uniteNodes.get(0); + List dispList = getNodeDisp(uniteNode); + findSql = NodeUtils.nodeDispUnite(dispList); + } + return findSql; + } + + /** + * 获取新增SQL + */ + private String getAddSql(HashMap> nodeMap, String findSql) { + Result> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, findSql)); + if (tableValue.getCode() != 200){ + throw new TaskException(tableValue.getMsg()); + } + List data = tableValue.getData(); + + List dispList = getNodeDisp(nodeMap.get("exportation").get(0)); + return NodeUtils.nodeDispExportation(dispList, data); + + } + + /** + * 执行新增SQL + */ + private void implAddSql(String addSql) { + Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql)); + if (addResult.getCode() != 200){ + throw new TaskException(addResult.getMsg()); + } + } + + // 查询节点配置信息 + public List getNodeDisp(Node node){ + List dispList = dispositionService + .selectNodeDispositionList(new NodeDisposition().buildNodeCode(node.getNodeCode())); + if (dispList.isEmpty()){ + throw new TaskException("节点 "+node+" 配置为空"); + } + return dispList; + } + }