From 40875894c4e5ae535df38c3ed5e0725c8bc916b2 Mon Sep 17 00:00:00 2001 From: Cui YongXing <2835316714@qq.com> Date: Thu, 5 Sep 2024 18:59:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../muyu/remote/feign/DatasourceFeign.java | 18 +++++--- .../feign/Factory/DatasourceFeignFactory.java | 15 ++++++- .../service/impl/TaskInfoServiceImpl.java | 45 ++++++++++++------- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java index 7fea1bb..2eeec67 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java @@ -16,15 +16,23 @@ import java.util.List; * @author Administrator */ -@FeignClient(name = "cloud-etl-datasources",fallbackFactory = DatasourceFeignFactory.class ) +@FeignClient(name = "cloud-etl-datasources", fallbackFactory = DatasourceFeignFactory.class) public interface DatasourceFeign { - @PostMapping("/dataValue/findTableValue") - public Result>> findTableValue(@RequestParam("basicId") Long basicId, @RequestParam("sql") String sql); + @PostMapping("/findTableValueToArray") + public Result findTableValueToArray(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql); + @PostMapping("/dataValue/findTableValue") + public Result>> findTableValue(@RequestParam("basicId") Long basicId, + @RequestParam("sql") String sql); @PostMapping("/product/addProduct") - public Result addProduct(@RequestParam("basicId") Long basicId, @RequestParam("tableId") Long tableId, @RequestBody List> listList); + public Result addProduct(@RequestParam("basicId") Long basicId, + @RequestParam("tableId") Long tableId, + @RequestBody List> listList); + @PostMapping("/addProduct") + public Result addProduct(@RequestParam("basicId") Long basicId, @RequestParam("tableId") Long tableId, @RequestBody DataValue[][] listList); @PostMapping("/dataValue/findCount") - public Result findCount(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql); + public Result findCount(@RequestParam("basicId") Long basicId, + @RequestParam("sql") String sql); } diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java index a869331..bd2e313 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java @@ -15,8 +15,16 @@ public class DatasourceFeignFactory implements FallbackFactory @Override public DatasourceFeign create(Throwable e) { return new DatasourceFeign() { + + @Override - public Result findTableValue(Long basicId, String sql) { + public Result findTableValueToArray(Long basicId, String sql) { + log.info(e); + return Result.error("网络开小差......"); + } + + @Override + public Result>> findTableValue(Long basicId, String sql) { log.info(e); return Result.error("网络开小差......"); } @@ -27,6 +35,11 @@ public class DatasourceFeignFactory implements FallbackFactory return Result.error("网络开小差......"); } + @Override + public Result addProduct(Long basicId, Long tableId, DataValue[][] listList) { + return null; + } + @Override public Result findCount(Long basicId, String sql) { log.info(e); diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index c36969b..0a6a14f 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -41,6 +41,8 @@ import static com.muyu.task.server.thread.OptimizedPrioritizedThreadPool.*; @Service public class TaskInfoServiceImpl extends ServiceImpl implements TaskInfoService { + private final Integer PAGE_SIZE = 5000; + @Resource private TaskInfoMapper taskInfoMapper; @Autowired @@ -76,7 +78,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i respPage.setPages(pages); return respPage; } -// + // @Override // public String findByFieName(Long taskId) { // QueryWrapper wrapper = new QueryWrapper<>(); @@ -228,8 +230,10 @@ public class TaskInfoServiceImpl extends ServiceImpl i Long tableId = one.getTableId(); Long newBasicId = one.getBasicId(); HashMap map = new HashMap<>(); + String[] newFieName = one.getNewFieName().split(","); String[] lastFieName = one.getLastFieName().split(","); + Integer num = lastFieName.length; for (int i = 0; i < newFieName.length; i++) { map.put(lastFieName[i], newFieName[i]); fieName += "," + tableNameMap.get(lastFieName[i]); @@ -241,32 +245,32 @@ public class TaskInfoServiceImpl extends ServiceImpl i Long data = countResult.getData(); String finalFieName = fieName; String finalJoint = joint; - long count = data/5000==0?1:data/5000+1; + long count = data/PAGE_SIZE==0?1:data/PAGE_SIZE+1; if (Weight.high.getValue().equals(weight)){ log.info("执行高级任务"); for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * 5000; + long pageNum = (i - 1) * PAGE_SIZE; submitHighPriorityTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map); + getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map,num); }); } } if (Weight.centre.getValue().equals(weight)){ log.info("执行中级任务"); for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * 5000; + long pageNum = (i - 1) * PAGE_SIZE; System.out.println(pageNum); submitMediumPriorityTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map); + getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map,num); }); } } if (Weight.low.getValue().equals(weight)){ log.info("执行低级任务"); for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * 5000; + long pageNum = (i - 1) * PAGE_SIZE; submitLowPriorityTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map); + getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map,num); }); } } @@ -275,10 +279,10 @@ public class TaskInfoServiceImpl extends ServiceImpl i // 调整线程分配以适应紧急任务 adjustForEmergency(); for (long i = 1; i <= count; i++) { - long pageNum = (i - 1) * 5000; + long pageNum = (i - 1) * PAGE_SIZE; submitEmergencyTask(()->{ - getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map); + getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map,num); }); } // 任务完成后恢复默认线程分配 @@ -291,19 +295,26 @@ public class TaskInfoServiceImpl extends ServiceImpl i @NotNull - private void getString(Long pageNum,String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap map ) { - String sqlSelect = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +",5000 "; + private void getString(Long pageNum,String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap map ,Integer num) { + String sqlSelect = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +","+PAGE_SIZE; log.info(sqlSelect); - Result>> tableValueResult = datasourceFeign.findTableValue(basicId, sqlSelect); - List> tableValue = tableValueResult.getData(); - for (List dataValues : tableValue) { - for (DataValue dataValue : dataValues) { + Result tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect); + DataValue[][] data = tableValueResult.getData(); + for (DataValue[] datum : data) { + for (DataValue dataValue : datum) { String key = dataValue.getKey(); String newKey = map.get(key); dataValue.setKey(newKey); } } - Result result = datasourceFeign.addProduct(newBasicId, tableId, tableValue); +// for (List dataValues : tableValue) { +// for (DataValue dataValue : dataValues) { +// String key = dataValue.getKey(); +// String newKey = map.get(key); +// dataValue.setKey(newKey); +// } +// } + Result result = datasourceFeign.addProduct(newBasicId, tableId, data); log.info(result); }