package com.muyu.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.muyu.common.core.domain.Result; import com.muyu.common.core.utils.StringUtils; import com.muyu.domain.*; import com.muyu.domain.req.TaskInfoReq; import com.muyu.mapper.TaskMapper; import com.muyu.service.*; import com.muyu.task.feign.DataValueClient; import com.muyu.task.feign.RuleFeign; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.HashSet; import java.util.List; import static com.muyu.task.PriorityThreadPool.*; /** * @PackageName:com.muyu.service.impl * @ClassName:TaskServiceImpl * @Description: * @author: ¥陈思豪¥ * @date: 2024/8/22 17:15 */ @Service @Log4j2 public class TaskServiceImpl extends ServiceImpl implements TaskService { @Autowired private TaskInputService taskInputService; @Autowired private TaskMapper taskMapper; @Autowired private TaskJoinService taskJoinService; @Autowired private TaskOutputService taskOutputService; @Autowired private DataValueClient dataValueClient; @Override public List selectList(TaskInfoReq taskInfoReq) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.like( StringUtils.isNotEmpty(taskInfoReq.getName()), TaskInfo::getName, taskInfoReq.getName() ); if(taskInfoReq.getStatus()!=null && taskInfoReq.getStatus()!=0){ queryWrapper.eq( TaskInfo::getStatus,taskInfoReq.getStatus() ); } if(taskInfoReq.getWeigh()!=null && taskInfoReq.getWeigh()!=0 ){ queryWrapper.eq( TaskInfo::getWeigh,taskInfoReq.getWeigh() ); } return this.list(queryWrapper); } @Override public String addTask(TaskInfo taskInfo) { boolean save = this.save(taskInfo); if(save == false){ throw new RuntimeException("err"); } return "success"; } @Override public String updById(TaskInfo taskInfo) { this.updateById(taskInfo); return "success"; } @Override public String deleteById(Integer id) { TaskMapper taskMapper = this.baseMapper; int i = taskMapper.deleteById(id); if(i<=0){ throw new RuntimeException("err"); } return "success"; } @Override public String executeTask(Integer taskId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(TaskInputInfo::getTaskId, taskId); List taskInputList = taskInputService.list(queryWrapper); if(taskInputList.isEmpty()){ return "表节点没有进行相对应的选择,无法继续执行"; } LambdaQueryWrapper outputInfo = new LambdaQueryWrapper<>(); outputInfo.eq(TaskOutputInfo::getTaskId, taskId); TaskOutputInfo taskOutputInfo = taskOutputService.getOne(outputInfo); String[] outPutFileName = taskOutputInfo.getLastFileName().split(","); String[] newFileName = taskOutputInfo.getNewFileName().split(","); Integer two = newFileName.length; HashMap fieldAsNameMap = new HashMap<>(); HashMap newAndOldMap = new HashMap<>(); HashSet basicList = new HashSet<>(); String sql = ""; Long databaseId = 0L; for (int i = 0; i < taskInputList.size(); i++) { basicList.add(taskInputList.get(i).getDatabaseId()); databaseId = Long.parseLong(taskInputList.get(i).getDatabaseId()); if(basicList.size()>1){ throw new RuntimeException("数据库选择的不同,无法执行"); } String[] tableFieldList = taskInputList.get(i).getTableField().split(","); String[] tableAsFieldList = taskInputList.get(i).getTableAsField().split(","); for (int j = 0; j < tableAsFieldList.length; j++) { fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]); fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]); for (int o = 0; o < newFileName.length; o++) { newAndOldMap.put(fieldAsNameMap.get(outPutFileName[o]), newFileName[o]); if(tableAsFieldList[j].equals(outPutFileName[o])){ sql += ","+taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] +" " + tableAsFieldList[j] + " "; } } } } sql = sql.substring(1); sql = "select " + sql + " from "; String leftJoin = ""; LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId); List taskJoinInfos = taskJoinService.list(lambdaQueryWrapper); if(!taskJoinInfos.isEmpty()){ for (TaskJoinInfo taskJoinInfo : taskJoinInfos) { //拿到左表id String leftId = taskJoinInfo.getLeftId(); TaskInputInfo leftInput = taskInputService.findByNodeId(leftId); //拿到右表id String rightId = taskJoinInfo.getRightId(); TaskInputInfo rightInput = taskInputService.findByNodeId(rightId); leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " + taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " + rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." + fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." + fieldAsNameMap.get(taskJoinInfo.getRightJoinField()) + " "; } }else{ LambdaQueryWrapper selectOne = new LambdaQueryWrapper<>(); selectOne.eq(TaskInputInfo::getTaskId, taskId); TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne); leftJoin += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName(); } LambdaQueryWrapper taskInfoQueryWrapper = new LambdaQueryWrapper<>(); taskInfoQueryWrapper.eq(TaskInfo::getId, taskId); TaskInfo taskInfo = taskMapper.selectById(taskId); Long basicId = Long.valueOf(taskOutputInfo.getBasicId()); Long tableId = Long.valueOf(taskOutputInfo.getTableId()); String sqlCount = "select count(1) from "+leftJoin; //查询出总条数 Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount); log.info("查询到的条数为{}",count); int pageSize = 10000; int totalSegments = (int) Math.ceil((double) count / pageSize); log.info("总共页码为{}", totalSegments); //查询数据 sql = sql + leftJoin; for (int i = 0; i < totalSegments; i++) { log.info("当前页为{}", i); int pageNum = i * totalSegments; long finalFirstArray = Math.min(pageSize, count - pageNum); String limit = " limit " + i * pageSize + "," + pageSize; String limitSelect = sql + limit; Long finalDatabaseId = databaseId; String finalSql = sql; log.info("执行查询语句为{}", limitSelect); if(taskInfo.getWeigh() == 4){ log.info("执行紧急任务"); log.info("sql为{}",finalSql); executeUrgently(() -> { selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); }); } if(taskInfo.getWeigh() == 3){ log.info("执行高级任务"); executeHigh(() -> { selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); }); } if(taskInfo.getWeigh() == 2){ log.info("执行中级任务"); executeMedium(() -> { selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); }); } if(taskInfo.getWeigh() == 1){ log.info("执行低级任务"); executeLow(() -> { selectAndAdd(finalDatabaseId, basicId, finalSql, tableId,newAndOldMap, finalFirstArray,two); }); } } // Result>> tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql); return "success"; } private void selectAndAdd(Long databaseId,Long basicId,String sql,Long tableId, HashMap newAndOldMap,Long firstArray ,Integer two) { DataValue[][] tableValue = dataValueClient.findTableValueToArray(databaseId, sql, firstArray,two); log.info("远程调用完毕,调用数量{}",tableValue.length); for (DataValue[] dataValues : tableValue) { for (DataValue dataValue : dataValues) { String key = dataValue.getKey(); String newKey = newAndOldMap.get(key); dataValue.setKey(newKey); } } Result result = dataValueClient.addProduct(basicId, tableId, tableValue); log.info("添加结果为{}", result); log.info("添加到queue里成功"); // private void selectAndAdd(Integer count,Long databaseId,Long basicId,String sql,Long tableId, // HashMap newAndOldMap,Integer two) { // int pageSize = 700000; // long firstArray = 0L; // int totalSegments = (int) Math.ceil((double) count / pageSize); // log.info("总共页码为{}", totalSegments); // for (int i = 0; i < totalSegments; i++) { // log.info("当前页为{}", i); // String limit = " limit " + i * pageSize + "," + pageSize; // firstArray = Math.min(pageSize, count - i * pageSize); // if (firstArray >= pageSize) { // firstArray = pageSize; // } // String limitSelect = sql + limit; // log.info("执行查询语句为{}", limitSelect); // log.info(databaseId); // DataValue[][] tableValue = dataValueClient.findTableValueToArray(databaseId, limitSelect, firstArray,two); // log.info("远程调用完毕,调用数量{}",tableValue.length); // for (DataValue[] dataValues : tableValue) { // for (DataValue dataValue : dataValues) { // String key = dataValue.getKey(); // String newKey = newAndOldMap.get(key); // dataValue.setKey(newKey); // } // } // Result result = dataValueClient.addProduct(basicId, tableId, tableValue); // log.info("添加结果为{}", result); // log.info("添加到queue里成功"); // } // static class Worker implements Runnable { // private final PriorityBlockingQueue queue; // // public Worker(PriorityBlockingQueue queue) { // this.queue = queue; // } // // @Override // public void run() { // while (!Thread.currentThread().isInterrupted()) { // try { // // 从队列中取出一个任务 // SegmentTask task = queue.take(); // // 执行任务 // task.run(); // } catch (InterruptedException e) { // Thread.currentThread().interrupt(); // 设置中断状态 // break; // } // } // } // // } // static class SegmentTask implements Runnable, Comparable { // // // @Resource // private DataValueClient dataValueClient; // // private final String sql; // private final Long basicId; // private final Long tableId; // private final Weight weight; // HashMap map = new HashMap<>(); // // public SegmentTask(String sql, Long basicId, Long tableId, HashMap map, Weight weight) { // this.sql = sql; // this.basicId = basicId; // this.tableId = tableId; // this.map = map; // this.weight = weight; // } // // @Override // public void run() { // // log.info("开始执行任务:" + basicId + " " + sql + " " + weight + " " + tableId, tableId); // // 查询数据 // Result tableValue = dataValueClient.findTableValue(basicId, sql); // log.info("远程调用完毕,调用的值为{}", tableValue.getData()); // List> data = (List>) tableValue.getData(); // log.info("查询结果:{}", data); // for (List datum : data) { // for (DataValue dataValue : datum) { // String key = dataValue.getKey(); // String newKey = map.get(key); // dataValue.setKey(newKey); // } // } // log.info("开始添加:{}", data); // // Result result = dataValueClient.addTableDataValue(basicId, tableId, data); // log.info("添加完毕字段:{}", result); // // 处理结果 //// dataValueList.accept(tableValue.getData()); // // // } // // public Weight geWeight() { // return weight; // } // // // @Override // public int compareTo(@NotNull SegmentTask o) { // return Integer.compare(o.geWeight().getWeight(), this.weight.getWeight()); // } // } } // @Autowired // private NodeRuleService nodeRuleService; // // @Autowired // private RuleFeign ruleFeign; // public static Map> engineMap = new ConcurrentHashMap<>(); // public static Map> engineRowMap = new ConcurrentHashMap<>(); // public static Map> engineDataSetMap = new ConcurrentHashMap<>(); }