diff --git a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java index ddd0a50..c7ec4a4 100644 --- a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java @@ -165,12 +165,7 @@ public class TaskServiceImpl extends ServiceImpl implement leftJoin += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName(); } - String sqlCount = "select count(1) from "+leftJoin; - //查询出总条数 - Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount); - log.info("查询到的条数为{}",count); - //查询数据 - sql = sql + leftJoin; + @@ -182,43 +177,54 @@ public class TaskServiceImpl extends ServiceImpl implement Long basicId = Long.valueOf(taskOutputInfo.getBasicId()); Long tableId = Long.valueOf(taskOutputInfo.getTableId()); - - if(taskInfo.getWeigh() == 4){ - log.info("执行紧急任务"); + String sqlCount = "select count(1) from "+leftJoin; + //查询出总条数 + Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount); + log.info("查询到的条数为{}",count); + int pageSize = 10000; + int totalSegments = (int) Math.ceil((double) count / pageSize); + log.info("总共页码为{}", totalSegments); + //查询数据 + sql = sql + leftJoin; + for (int i = 0; i < totalSegments; i++) { + log.info("当前页为{}", i); + int pageNum = i * totalSegments; + long finalFirstArray = Math.min(pageSize, count - pageNum); + String limit = " limit " + i * pageSize + "," + pageSize; + String limitSelect = sql + limit; Long finalDatabaseId = databaseId; String finalSql = sql; - log.info("sql为{}",finalSql); - executeUrgently(() -> { - selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap,two); - }); + log.info("执行查询语句为{}", limitSelect); + if(taskInfo.getWeigh() == 4){ + log.info("执行紧急任务"); + log.info("sql为{}",finalSql); + executeUrgently(() -> { + selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); + }); + } + + if(taskInfo.getWeigh() == 3){ + log.info("执行高级任务"); + executeHigh(() -> { + selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); + }); + } + + if(taskInfo.getWeigh() == 2){ + log.info("执行中级任务"); + executeMedium(() -> { + selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); + }); + } + + if(taskInfo.getWeigh() == 1){ + log.info("执行低级任务"); + executeLow(() -> { + selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); + }); + } } - if(taskInfo.getWeigh() == 3){ - log.info("执行高级任务"); - Long finalDatabaseId = databaseId; - String finalSql = sql; - executeHigh(() -> { - selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap,two); - }); - } - - if(taskInfo.getWeigh() == 2){ - log.info("执行中级任务"); - Long finalDatabaseId = databaseId; - String finalSql = sql; - executeMedium(() -> { - selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap,two); - }); - } - - if(taskInfo.getWeigh() == 1){ - log.info("执行低级任务"); - Long finalDatabaseId = databaseId; - String finalSql = sql; - executeLow(() -> { - selectAndAdd(count, finalDatabaseId, basicId, finalSql, tableId,newAndOldMap,two); - }); - } // Result>> tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql); @@ -226,22 +232,9 @@ public class TaskServiceImpl extends ServiceImpl implement return "success"; } - private void selectAndAdd(Integer count,Long databaseId,Long basicId,String sql,Long tableId, - HashMap newAndOldMap,Integer two) { - int pageSize = 700000; - long firstArray = 0L; - int totalSegments = (int) Math.ceil((double) count / pageSize); - for (int i = 0; i < totalSegments; i++) { - log.info("当前页为{}", i); - String limit = " limit " + i * pageSize + "," + pageSize; - firstArray = Math.min(pageSize, count - i * pageSize); - if (firstArray >= pageSize) { - firstArray = pageSize; - } - String limitSelect = sql + limit; - log.info("执行查询语句为{}", limitSelect); - log.info(databaseId); - DataValue[][] tableValue = dataValueClient.findTableValueToArray(databaseId, limitSelect, firstArray,two); + private void selectAndAdd(Long databaseId,Long basicId,String sql,Long tableId, + HashMap newAndOldMap,Long firstArray ,Integer two) { + DataValue[][] tableValue = dataValueClient.findTableValueToArray(databaseId, sql, firstArray,two); log.info("远程调用完毕,调用数量{}",tableValue.length); for (DataValue[] dataValues : tableValue) { for (DataValue dataValue : dataValues) { @@ -253,7 +246,36 @@ public class TaskServiceImpl extends ServiceImpl implement Result result = dataValueClient.addProduct(basicId, tableId, tableValue); log.info("添加结果为{}", result); log.info("添加到queue里成功"); - } + +// private void selectAndAdd(Integer count,Long databaseId,Long basicId,String sql,Long tableId, +// HashMap newAndOldMap,Integer two) { +// int pageSize = 700000; +// long firstArray = 0L; +// int totalSegments = (int) Math.ceil((double) count / pageSize); +// log.info("总共页码为{}", totalSegments); +// for (int i = 0; i < totalSegments; i++) { +// log.info("当前页为{}", i); +// String limit = " limit " + i * pageSize + "," + pageSize; +// firstArray = Math.min(pageSize, count - i * pageSize); +// if (firstArray >= pageSize) { +// firstArray = pageSize; +// } +// String limitSelect = sql + limit; +// log.info("执行查询语句为{}", limitSelect); +// log.info(databaseId); +// DataValue[][] tableValue = dataValueClient.findTableValueToArray(databaseId, limitSelect, firstArray,two); +// log.info("远程调用完毕,调用数量{}",tableValue.length); +// for (DataValue[] dataValues : tableValue) { +// for (DataValue dataValue : dataValues) { +// String key = dataValue.getKey(); +// String newKey = newAndOldMap.get(key); +// dataValue.setKey(newKey); +// } +// } +// Result result = dataValueClient.addProduct(basicId, tableId, tableValue); +// log.info("添加结果为{}", result); +// log.info("添加到queue里成功"); +// }