package com.muyu.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.muyu.common.core.domain.Result; import com.muyu.common.core.utils.StringUtils; import com.muyu.domain.TaskInfo; import com.muyu.domain.TaskInputInfo; import com.muyu.domain.TaskJoinInfo; import com.muyu.domain.TaskOutputInfo; import com.muyu.domain.req.TaskInfoReq; import com.muyu.domain.taskenum.Weight; import com.muyu.mapper.TaskMapper; import com.muyu.service.TaskInputService; import com.muyu.service.TaskJoinService; import com.muyu.service.TaskOutputService; import com.muyu.task.SegmentTask; import com.muyu.service.TaskService; import com.muyu.task.feign.DataValueClient; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; /** * @PackageName:com.muyu.service.impl * @ClassName:TaskServiceImpl * @Description: * @author: ¥陈思豪¥ * @date: 2024/8/22 17:15 */ @Service @Log4j2 public class TaskServiceImpl extends ServiceImpl implements TaskService { @Autowired private TaskInputService taskInputService; @Autowired private TaskMapper taskMapper; @Autowired private TaskJoinService taskJoinService; @Autowired private TaskOutputService taskOutputService; @Autowired private DataValueClient dataValueClient; @Override public List selectList(TaskInfoReq taskInfoReq) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.like( StringUtils.isNotEmpty(taskInfoReq.getName()), TaskInfo::getName, taskInfoReq.getName() ); if(taskInfoReq.getStatus()!=null && taskInfoReq.getStatus()!=0){ queryWrapper.eq( TaskInfo::getStatus,taskInfoReq.getStatus() ); } if(taskInfoReq.getWeigh()!=null && taskInfoReq.getWeigh()!=0 ){ queryWrapper.eq( TaskInfo::getWeigh,taskInfoReq.getWeigh() ); } return this.list(queryWrapper); } @Override public String addTask(TaskInfo taskInfo) { boolean save = this.save(taskInfo); if(save == false){ throw new RuntimeException("err"); } return "success"; } @Override public String updById(TaskInfo taskInfo) { this.updateById(taskInfo); return "success"; } @Override public String deleteById(Integer id) { TaskMapper taskMapper = this.baseMapper; int i = taskMapper.deleteById(id); if(i<=0){ throw new RuntimeException("err"); } return "success"; } @Override public String executeTask(Integer taskId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(TaskInputInfo::getTaskId, taskId); List taskInputList = taskInputService.list(queryWrapper); if(taskInputList.isEmpty()){ return "表节点没有进行相对应的选择,无法继续执行"; } LambdaQueryWrapper outputInfo = new LambdaQueryWrapper<>(); outputInfo.eq(TaskOutputInfo::getTaskId, taskId); TaskOutputInfo taskOutputInfo = taskOutputService.getOne(outputInfo); String[] outPutFileName = taskOutputInfo.getLastFileName().split(","); String[] newFileName = taskOutputInfo.getNewFileName().split(","); HashMap fieldAsNameMap = new HashMap<>(); HashMap newAndOldMap = new HashMap<>(); String sql = ""; for (int i = 0; i < taskInputList.size(); i++) { String[] tableFieldList = taskInputList.get(i).getTableField().split(","); String[] tableAsFieldList = taskInputList.get(i).getTableAsField().split(","); for (int j = 0; j < tableAsFieldList.length; j++) { for (int o = 0; o < newFileName.length; o++) { newAndOldMap.put(outPutFileName[j], newFileName[j]); if(tableAsFieldList[j].equals(outPutFileName[o])){ sql += ","+taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] +" " + tableAsFieldList[j] + " "; } } fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]); } } System.out.println(sql); System.out.println(sql); System.out.println(sql); System.out.println(sql); System.out.println(sql); sql = sql.substring(1); sql = "select " + sql + " from "; String leftJoin = ""; LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId); List taskJoinInfos = taskJoinService.list(lambdaQueryWrapper); if(!taskJoinInfos.isEmpty()){ for (TaskJoinInfo taskJoinInfo : taskJoinInfos) { //拿到左表id String leftId = taskJoinInfo.getLeftId(); TaskInputInfo leftInput = taskInputService.findByNodeId(leftId); //拿到右表id String rightId = taskJoinInfo.getRightId(); TaskInputInfo rightInput = taskInputService.findByNodeId(rightId); leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " + taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " + rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." + fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." + fieldAsNameMap.get(taskJoinInfo.getRightJoinField()) + " "; } }else{ LambdaQueryWrapper selectOne = new LambdaQueryWrapper<>(); queryWrapper.eq(TaskInputInfo::getTaskId, taskId); TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne); leftJoin += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName(); } String sqlCount = "select count(1) from "+leftJoin; //查询出总条数 Integer count = dataValueClient.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount); log.info("查询到的条数为{}",count); //查询数据 sql = sql + leftJoin; int pageSize = 1000; int totalSegments = (int) Math.ceil((double) count / pageSize); LambdaQueryWrapper taskInfoQueryWrapper = new LambdaQueryWrapper<>(); taskInfoQueryWrapper.eq(TaskInfo::getId, taskId); TaskInfo taskInfo = taskMapper.selectById(taskId); Long basicId = Long.valueOf(taskOutputInfo.getBasicId()); Long tableId = Long.valueOf(taskOutputInfo.getTableId()); Integer weigh = taskInfo.getWeigh(); long firstArray = 0L; PriorityBlockingQueue queue = new PriorityBlockingQueue<>(totalSegments); // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < totalSegments; i++) { log.info("调用第{}次",i); String limit = " limit "+ i*pageSize + ","+pageSize; firstArray = count - (long) i *pageSize; if (firstArray>=pageSize){ firstArray=pageSize; } String limitSelect = sql + limit; SegmentTask segmentTask = new SegmentTask(limitSelect, basicId, tableId,newAndOldMap, Weight.getWeight(weigh)); Result tableValue = dataValueClient.findTableValue(basicId, sql); log.info("远程调用完毕,调用的值为{}",tableValue.getData()); log.info("创建任务添加到队列当中"); queue.add(segmentTask); log.info("添加到queue里成功"); break; } // 启动线程池执行任务 for (int i = 0; i < 10; i++) { // 可以根据需要调整线程池大小 executor.submit(new Worker(queue)); } executor.shutdown(); try { // 等待所有任务完成 executor.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS); } catch (InterruptedException e) { e.printStackTrace(); } // Result>> tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql); return "success"; } static class Worker implements Runnable { private final PriorityBlockingQueue queue; public Worker(PriorityBlockingQueue queue) { this.queue = queue; } @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { // 从队列中取出一个任务 SegmentTask task = queue.take(); // 执行任务 task.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 设置中断状态 break; } } } } public void selectAndAdd(){ } }