diff --git a/muyu-quest-remote/src/main/java/com/muyu/quest/remote/RemoteDataSourceService.java b/muyu-quest-remote/src/main/java/com/muyu/quest/remote/RemoteDataSourceService.java index a930a69..2a7a66e 100644 --- a/muyu-quest-remote/src/main/java/com/muyu/quest/remote/RemoteDataSourceService.java +++ b/muyu-quest-remote/src/main/java/com/muyu/quest/remote/RemoteDataSourceService.java @@ -22,13 +22,21 @@ import java.util.List; */ @FeignClient(contextId = "RemoteDataSourceService", value = ServiceNameConstants.SOURCE_SERVICE, - url = "http://10.0.2.1:19652/", fallbackFactory = RemoteDataSourceFactory.class) public interface RemoteDataSourceService { @PostMapping("/value/findTableValue") public Result> findTableValue(@RequestBody DataValueModel dataValueModel); + /** + * 根据基础表ID和SQL语句查询条数 + * + * @param dataValueModel 基础表ID和sql语句 + * @return DataValue{kltv} + */ + @PostMapping("/value/getTableValueTotal") + public Result getTableValueTotal(@RequestBody DataValueModel dataValueModel); + /** * 根据基础表ID和SQL语句查询数据 * diff --git a/muyu-quest-remote/src/main/java/com/muyu/quest/remote/factory/RemoteDataSourceFactory.java b/muyu-quest-remote/src/main/java/com/muyu/quest/remote/factory/RemoteDataSourceFactory.java index f6b6a04..fcd3ecf 100644 --- a/muyu-quest-remote/src/main/java/com/muyu/quest/remote/factory/RemoteDataSourceFactory.java +++ b/muyu-quest-remote/src/main/java/com/muyu/quest/remote/factory/RemoteDataSourceFactory.java @@ -22,6 +22,11 @@ public class RemoteDataSourceFactory implements FallbackFactory taskQueue = new LinkedList(); private static TaskManager taskManager; // 创建具有默认线程个数的线程池 private TaskManager() { - this(5); + this(10); } // 创建线程池,workerNum为线程池中工作线程的个数 @@ -64,12 +65,12 @@ public final class TaskManager { return taskManager; } - // 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定 + // 把任务加入任务队列 public void execute(List task) { execute(task.toArray(new Runnable[0])); } - // 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定 + // 把任务加入任务队列 public void execute(Runnable... task) { synchronized (taskQueue) { Collections.addAll(taskQueue, task); @@ -132,7 +133,7 @@ public final class TaskManager { @Override public void run() { Runnable r = null; - while (isRunning) {// 注意,若线程无效则自然结束run方法,该线程就没用了 + while (isRunning) {// 若线程无效则自然结束run方法,该线程就没用了 synchronized (taskQueue) { while (isRunning && taskQueue.isEmpty()) {// 队列为空 try { diff --git a/muyu-quest-server/src/main/java/com/muyu/quest/service/TaskService.java b/muyu-quest-server/src/main/java/com/muyu/quest/service/TaskService.java index f7823a4..a2809a7 100644 --- a/muyu-quest-server/src/main/java/com/muyu/quest/service/TaskService.java +++ b/muyu-quest-server/src/main/java/com/muyu/quest/service/TaskService.java @@ -48,4 +48,6 @@ public interface TaskService extends IService { String execute(String taskCode); String testExecute(List nodeList); + + String test(); } diff --git a/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java b/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java index 2db8085..f35e6dc 100644 --- a/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java +++ b/muyu-quest-server/src/main/java/com/muyu/quest/service/impl/TaskServiceImpl.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.muyu.common.core.domain.Result; import com.muyu.common.core.utils.StringUtils; +import com.muyu.quest.manager.TaskManager; import com.muyu.quest.model.DataModel; import com.muyu.quest.model.DataValueModel; import com.muyu.quest.domain.Node; @@ -49,6 +50,8 @@ public class TaskServiceImpl extends ServiceImpl @Resource private RemoteDataSourceService remoteDataSourceService; + private final static TaskManager taskManager = TaskManager.getTaskManager(); + /** * 查询任务 * @@ -150,11 +153,26 @@ public class TaskServiceImpl extends ServiceImpl String findSql = getFindSql(nodeMap); // 获取查询条数 int count = getFindCount(findSql, nodeMap); - // 获取新增SQL - String addSql = getAddSql(nodeMap,findSql); - // 执行新增SQL - implAddSql(addSql); - + // 划分线程 每次查询1000条 + int threadNum = count / 1000 + 1; + log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum); + Runnable[] tasks = new Runnable[threadNum]; + for (int i = 0; i < threadNum; i++) { + int index = i+1; + tasks[i] = () -> { + // 获取新SQL 并执行 + String sql = findSql + " LIMIT 1000 OFFSET "+(index-1)*1000; + log.info("任务 {} 开始执行第 {} 线程,查询SQL: {}",taskCode,index, sql); + String addSql = getAddSql(nodeMap, sql); + log.info("任务 {} 开始执行第 {} 线程,新增SQL: {}",taskCode,index, addSql); + Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql)); + if (addResult.getCode() != 200){ + throw new TaskException(addResult.getMsg()); + } + }; + } + // 添加进入任务队列 + taskManager.execute(tasks); return "执行成功"; } @@ -173,6 +191,14 @@ public class TaskServiceImpl extends ServiceImpl return "测试成功,无异常"; } + @Override + public String test() { + String sql = "SELECT `cloud-ecology`.task_source.task_code,`cloud-ecology`.task_source.task_name,`cloud-ecology`.node_info.node_code,`cloud-ecology`.node_info.node_type,`cloud-ecology`.node_info.node_name FROM `cloud-ecology`.task_source RIGHT JOIN `cloud-ecology`.node_info ON `cloud-ecology`.task_source.task_code = `cloud-ecology`.node_info.task_code LIMIT 10 OFFSET 0"; + Result> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, sql)); + System.out.println(tableValue); + return ""; + } + /** * 执行任务第一步: 校验节点规范 并返回初始化节点map */ @@ -212,19 +238,24 @@ public class TaskServiceImpl extends ServiceImpl System.out.println(findSql); String findCountSql = ""; List uniteNodes = nodeMap.get("unite"); + findSql = findSql.replace(" "," "); + String[] s1 = findSql.split(" "); if (uniteNodes!=null && !uniteNodes.isEmpty()){ - String[] split = findSql.split(" "); - split[1] = "COUNT(" + split[1].split(",")[0]+ ")"; - findCountSql = StringUtils.join(split," "); + s1[1] = "COUNT(" + s1[1].split(",")[0]+ ")"; + findCountSql = StringUtils.join(s1," "); }else { - String[] split = findSql.split("\\."); + + String[] split = s1[s1.length-1].split("\\."); findCountSql = "SELECT TABLE_ROWS " + "FROM INFORMATION_SCHEMA.TABLES " + - "WHERE TABLE_SCHEMA = '"+split[split.length-2]+"' AND TABLE_NAME = '"+split[split.length-1]+"';"; + "WHERE TABLE_SCHEMA = "+split[0]+" AND TABLE_NAME = '"+split[1]+"';"; + findCountSql = findCountSql.replace("`","'"); } - Result> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, findCountSql)); + System.out.println(findCountSql); + Result tableValue = remoteDataSourceService.getTableValueTotal(new DataValueModel(4L, findCountSql)); System.out.println(tableValue); - return 1; + + return (int) tableValue.getData(); } /** @@ -237,21 +268,12 @@ public class TaskServiceImpl extends ServiceImpl } List data = tableValue.getData(); + List dispList = getNodeDisp(nodeMap.get("exportation").get(0)); return NodeUtils.nodeDispExportation(dispList, data); } - /** - * 执行新增SQL - */ - private void implAddSql(String addSql) { - Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql)); - if (addResult.getCode() != 200){ - throw new TaskException(addResult.getMsg()); - } - } - // 查询节点配置信息 public List getNodeDisp(Node node){ List dispList = dispositionService