添加联查信息方法

master
陈思豪 2024-09-06 15:59:46 +08:00
parent 4f07aa08ad
commit 09bfc67cbf
7 changed files with 298 additions and 15 deletions

View File

@ -23,6 +23,13 @@
<groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-etl-common</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,29 @@
package com.muyu.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* @PackageName:com.muyu.domain
* @ClassName:DataValue
* @Description:
* @author:
* @date: 2024/9/5 20:04
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class DataValue {
private String key;
private String label;
private String type;
private Object value;
}

View File

@ -0,0 +1,49 @@
package com.muyu.domain.taskenum;
import lombok.extern.log4j.Log4j2;
/**
* @PackageName:com.muyu.domain.taskenum
* @ClassName:Priority
* @Description:
* @author:
* @date: 2024/9/5 19:59
*/
@Log4j2
public enum Weight {
//紧急
URGENT(4),
//高
HIGH(3),
//中
MEDIUM(2),
//低
LOW(1);
private final int weight;
Weight(int weight) {
this.weight = weight;
}
public int getWeight() {
return weight;
}
public static Weight getWeight(int weight) {
switch (weight){
case 4:
return URGENT;
case 3:
return HIGH;
case 2:
return MEDIUM;
case 1:
return LOW;
default:
log.info("{} is illegal, weight",weight);
return null;
}
}
}

View File

@ -1,10 +1,14 @@
package com.muyu.task.feign;
import com.muyu.common.core.domain.Result;
import com.muyu.domain.DataValue;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
/**
* @PackageName:com.muyu.task.feign
* @ClassName:dataValue
@ -13,9 +17,28 @@ import org.springframework.web.bind.annotation.RequestParam;
* @date: 2024/9/4 15:13
*/
//,fallback = TaskFeignFallback.class
@FeignClient(name = "cloud-source")
@FeignClient(name = "cloud-source" )
public interface DataValueClient {
/**
* sql
* @param basicId
* @param sql
* @return
*/
@PostMapping("/DataValue/findTableValue")
public Result findTableValue(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql);
/**
* sql
* @param basicId
* @param sql
* @return
*/
@PostMapping("/DataValue/findCount")
public Integer findCount(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql);
//添加
@PostMapping("/DataValue/addTable")
public Result addTableDataValue(@RequestParam("basicId") Long basicId,@RequestParam("tableId") Long tableId, @RequestBody List<List<DataValue>> dataValue);
}

View File

@ -2,17 +2,18 @@ package com.muyu.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.domain.TaskInfo;
import com.muyu.domain.TaskInputInfo;
import com.muyu.domain.TaskJoinInfo;
import com.muyu.domain.TaskOutputInfo;
import com.muyu.domain.req.TaskInfoReq;
import com.muyu.domain.taskenum.Weight;
import com.muyu.mapper.TaskMapper;
import com.muyu.service.TaskInputService;
import com.muyu.service.TaskJoinService;
import com.muyu.service.TaskOutputService;
import com.muyu.task.SegmentTask;
import com.muyu.service.TaskService;
import com.muyu.task.feign.DataValueClient;
import lombok.extern.log4j.Log4j2;
@ -21,6 +22,10 @@ import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
/**
* @PackageName:com.muyu.service.impl
@ -104,30 +109,34 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implement
return "表节点没有进行相对应的选择,无法继续执行";
}
LambdaQueryWrapper<TaskOutputInfo> outputInfo = new LambdaQueryWrapper<>();
outputInfo.eq(TaskOutputInfo::getTaskId, taskId);
TaskOutputInfo taskOutputInfo = taskOutputService.getOne(outputInfo);
String[] outPutFileName = taskOutputInfo.getLastFileName().split(",");
String[] newFileName = taskOutputInfo.getNewFileName().split(",");
HashMap<String, String> fieldAsNameMap = new HashMap<>();
HashMap<String, String> newAndOldMap = new HashMap<>();
String sql = "";
for (int i = 0; i < taskInputList.size(); i++) {
String[] tableFieldList = taskInputList.get(i).getTableField().split(",");
String[] tableAsFieldList = taskInputList.get(i).getTableAsField().split(",");
for (int j = 0; j < tableAsFieldList.length; j++) {
for (int o = 0; o < outPutFileName.length; o++) {
if(tableAsFieldList[j].equals(outPutFileName[o])){
for (int o = 0; o < newFileName.length; o++) {
newAndOldMap.put(outPutFileName[j], newFileName[j]);
if(tableAsFieldList[j].equals(newFileName[o])){
sql += ","+taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] +" " +
tableAsFieldList[j] + " ";
}
}
fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]);
// 规则
// tableNameMap.put(tableFieldList[j], tableAsFieId[j]);
}
}
sql = sql.substring(1);
sql = "select " + sql + " from ";
String leftJoin = "";
LambdaQueryWrapper<TaskJoinInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId);
List<TaskJoinInfo> taskJoinInfos = taskJoinService.list(lambdaQueryWrapper);
@ -139,7 +148,7 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implement
//拿到右表id
String rightId = taskJoinInfo.getRightId();
TaskInputInfo rightInput = taskInputService.findByNodeId(rightId);
sql += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " +
leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " +
taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " +
rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." +
@ -149,21 +158,93 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, TaskInfo> implement
LambdaQueryWrapper<TaskInputInfo> selectOne = new LambdaQueryWrapper<>();
queryWrapper.eq(TaskInputInfo::getTaskId, taskId);
TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne);
sql += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName();
leftJoin += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName();
}
String sqlCount = "select count(1) from "+leftJoin;
//查询出总条数
Integer count = dataValueFeign.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount);
//查询数据
sql = sql + leftJoin;
int pageSize = 1000;
int totalSegments = (int) Math.ceil((double) count / pageSize);
LambdaQueryWrapper<TaskInfo> taskInfoQueryWrapper = new LambdaQueryWrapper<>();
taskInfoQueryWrapper.eq(TaskInfo::getId, taskId);
TaskInfo taskInfo = taskMapper.selectById(taskId);
Long basicId = Long.valueOf(taskOutputInfo.getBasicId());
Long tableId = Long.valueOf(taskOutputInfo.getTableId());
Integer weigh = taskInfo.getWeigh();
long firstArray = 0L;
PriorityBlockingQueue<SegmentTask> queue = new PriorityBlockingQueue<>(totalSegments);
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < totalSegments; i++) {
log.info("调用第一次");
String limit = " limit "+ i*pageSize + ","+pageSize;
firstArray = count - (long) i *pageSize;
if (firstArray>=pageSize){
firstArray=pageSize;
}
String limitSelect = sql + limit;
SegmentTask segmentTask = new SegmentTask(limitSelect, tableId, basicId,newAndOldMap, Weight.getWeight(weigh));
queue.add(segmentTask);
}
// 启动线程池执行任务
for (int i = 0; i < 10; i++) { // 可以根据需要调整线程池大小
executor.submit(new Worker(queue));
}
executor.shutdown();
try {
// 等待所有任务完成
executor.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(sql);
log.info(sql);
log.info(sql);
Result tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql);
System.out.println(tableValue);
System.out.println(tableValue);
System.out.println(tableValue);
// Result<List<List<DataValue>>> tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql);
return "success";
}
static class Worker implements Runnable {
private final PriorityBlockingQueue<SegmentTask> queue;
public Worker(PriorityBlockingQueue<SegmentTask> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 从队列中取出一个任务
SegmentTask task = queue.take();
// 执行任务
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 设置中断状态
break;
}
}
}
}
public void selectAndAdd(){
}
}

View File

@ -0,0 +1,23 @@
package com.muyu.task;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @PackageName:com.muyu.task
* @ClassName:PriorityQueryExecutor
* @Description:
* @author:
* @date: 2024/9/6 10:31
*/
public class PriorityQueryExecutor {
private static final ExecutorService executor = Executors.newFixedThreadPool(50);
public void execute(SegmentTask task) {
}
}

View File

@ -0,0 +1,71 @@
package com.muyu.task;
/**
* @PackageName:com.muyu.task
* @ClassName:SementTask
* @Description:
* @author:
* @date: 2024/9/5 19:57
*/
import com.muyu.common.core.domain.Result;
import com.muyu.domain.DataValue;
import com.muyu.domain.taskenum.Weight;
import com.muyu.task.feign.DataValueClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.List;
@Log4j2
public class SegmentTask implements Runnable {
@Autowired
private DataValueClient dataValueClient;
private final String sql;
private final Long basicId;
private final Long tableId;
private final Weight weight;
HashMap<String, String> map = new HashMap<>();
public SegmentTask(String sql, Long basicId, Long tableId, HashMap<String, String> map, Weight weight) {
this.sql = sql;
this.basicId = basicId;
this.tableId = tableId;
this.map=map;
this.weight = weight;
}
@Override
public void run() {
// 查询数据
Result<List<List<DataValue>>> tableValue = dataValueClient.findTableValue(Long.valueOf(basicId), sql);
List<List<DataValue>> data = tableValue.getData();
log.info("查询结果:" + data);
for (List<DataValue> datum : data) {
for (DataValue dataValue : datum) {
String key = dataValue.getKey();
String newKey = map.get(key);
dataValue.setKey(newKey);
}
}
log.info("开始添加:{}",data);
Result result = dataValueClient.addTableDataValue(basicId, tableId, data);
log.info("添加完毕字段:{}",result);
// 处理结果
// dataValueList.accept(tableValue.getData());
}
public Weight geWeight() {
return weight;
}
}