diff --git a/cloud-task-common/pom.xml b/cloud-task-common/pom.xml
index 8c18460..7f73f9d 100644
--- a/cloud-task-common/pom.xml
+++ b/cloud-task-common/pom.xml
@@ -23,6 +23,13 @@
com.muyu
cloud-common-core
+
+
+ com.muyu
+ cloud-etl-common
+ 1.0.0
+
+
diff --git a/cloud-task-common/src/main/java/com/muyu/domain/DataValue.java b/cloud-task-common/src/main/java/com/muyu/domain/DataValue.java
new file mode 100644
index 0000000..0dd1773
--- /dev/null
+++ b/cloud-task-common/src/main/java/com/muyu/domain/DataValue.java
@@ -0,0 +1,29 @@
+package com.muyu.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+*
+* @PackageName:com.muyu.domain
+* @ClassName:DataValue
+* @Description:
+* @author: ¥陈思豪¥
+* @date: 2024/9/5 20:04
+*/
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@Builder
+public class DataValue {
+
+ private String key;
+
+ private String label;
+
+ private String type;
+
+ private Object value;
+}
diff --git a/cloud-task-common/src/main/java/com/muyu/domain/taskenum/Weight.java b/cloud-task-common/src/main/java/com/muyu/domain/taskenum/Weight.java
new file mode 100644
index 0000000..8e85376
--- /dev/null
+++ b/cloud-task-common/src/main/java/com/muyu/domain/taskenum/Weight.java
@@ -0,0 +1,49 @@
+package com.muyu.domain.taskenum;
+
+import lombok.extern.log4j.Log4j2;
+
+/**
+ * @PackageName:com.muyu.domain.taskenum
+ * @ClassName:Priority
+ * @Description:
+ * @author: ¥陈思豪¥
+ * @date: 2024/9/5 19:59
+ */
+@Log4j2
+public enum Weight {
+ //紧急
+ URGENT(4),
+ //高
+ HIGH(3),
+ //中
+ MEDIUM(2),
+ //低
+ LOW(1);
+
+ private final int weight;
+
+ Weight(int weight) {
+ this.weight = weight;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+
+ public static Weight getWeight(int weight) {
+ switch (weight){
+ case 4:
+ return URGENT;
+ case 3:
+ return HIGH;
+ case 2:
+ return MEDIUM;
+ case 1:
+ return LOW;
+ default:
+ log.info("{} is illegal, weight",weight);
+ return null;
+
+ }
+ }
+}
diff --git a/cloud-task-remote/src/main/java/com/muyu/task/feign/DataValueClient.java b/cloud-task-remote/src/main/java/com/muyu/task/feign/DataValueClient.java
index 00b7aad..f52b394 100644
--- a/cloud-task-remote/src/main/java/com/muyu/task/feign/DataValueClient.java
+++ b/cloud-task-remote/src/main/java/com/muyu/task/feign/DataValueClient.java
@@ -1,10 +1,14 @@
package com.muyu.task.feign;
import com.muyu.common.core.domain.Result;
+import com.muyu.domain.DataValue;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
+import java.util.List;
+
/**
* @PackageName:com.muyu.task.feign
* @ClassName:dataValue
@@ -13,9 +17,28 @@ import org.springframework.web.bind.annotation.RequestParam;
* @date: 2024/9/4 15:13
*/
//,fallback = TaskFeignFallback.class
-@FeignClient(name = "cloud-source")
+@FeignClient(name = "cloud-source" )
public interface DataValueClient {
+ /**
+ * 根据sql查询数据
+ * @param basicId
+ * @param sql
+ * @return
+ */
@PostMapping("/DataValue/findTableValue")
public Result findTableValue(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql);
+
+ /**
+ * 根据sql查询数据条数
+ * @param basicId
+ * @param sql
+ * @return
+ */
+ @PostMapping("/DataValue/findCount")
+ public Integer findCount(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql);
+
+ //添加
+ @PostMapping("/DataValue/addTable")
+ public Result addTableDataValue(@RequestParam("basicId") Long basicId,@RequestParam("tableId") Long tableId, @RequestBody List> dataValue);
}
diff --git a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java
index e0e71c9..b50f859 100644
--- a/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java
+++ b/cloud-task-server/src/main/java/com/muyu/service/impl/TaskServiceImpl.java
@@ -2,17 +2,18 @@ package com.muyu.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.domain.TaskInfo;
import com.muyu.domain.TaskInputInfo;
import com.muyu.domain.TaskJoinInfo;
import com.muyu.domain.TaskOutputInfo;
import com.muyu.domain.req.TaskInfoReq;
+import com.muyu.domain.taskenum.Weight;
import com.muyu.mapper.TaskMapper;
import com.muyu.service.TaskInputService;
import com.muyu.service.TaskJoinService;
import com.muyu.service.TaskOutputService;
+import com.muyu.task.SegmentTask;
import com.muyu.service.TaskService;
import com.muyu.task.feign.DataValueClient;
import lombok.extern.log4j.Log4j2;
@@ -21,6 +22,10 @@ import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+
/**
* @PackageName:com.muyu.service.impl
@@ -104,30 +109,34 @@ public class TaskServiceImpl extends ServiceImpl implement
return "表节点没有进行相对应的选择,无法继续执行";
}
+
+
LambdaQueryWrapper outputInfo = new LambdaQueryWrapper<>();
outputInfo.eq(TaskOutputInfo::getTaskId, taskId);
TaskOutputInfo taskOutputInfo = taskOutputService.getOne(outputInfo);
String[] outPutFileName = taskOutputInfo.getLastFileName().split(",");
+ String[] newFileName = taskOutputInfo.getNewFileName().split(",");
HashMap fieldAsNameMap = new HashMap<>();
+ HashMap newAndOldMap = new HashMap<>();
String sql = "";
for (int i = 0; i < taskInputList.size(); i++) {
String[] tableFieldList = taskInputList.get(i).getTableField().split(",");
String[] tableAsFieldList = taskInputList.get(i).getTableAsField().split(",");
for (int j = 0; j < tableAsFieldList.length; j++) {
- for (int o = 0; o < outPutFileName.length; o++) {
- if(tableAsFieldList[j].equals(outPutFileName[o])){
+ for (int o = 0; o < newFileName.length; o++) {
+ newAndOldMap.put(outPutFileName[j], newFileName[j]);
+ if(tableAsFieldList[j].equals(newFileName[o])){
sql += ","+taskInputList.get(i).getTableAsName() + "." + tableFieldList[j] +" " +
tableAsFieldList[j] + " ";
}
}
fieldAsNameMap.put(tableAsFieldList[j], tableFieldList[j]);
-// 规则
-// tableNameMap.put(tableFieldList[j], tableAsFieId[j]);
}
}
sql = sql.substring(1);
sql = "select " + sql + " from ";
+ String leftJoin = "";
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(TaskJoinInfo::getTaskId, taskId);
List taskJoinInfos = taskJoinService.list(lambdaQueryWrapper);
@@ -139,7 +148,7 @@ public class TaskServiceImpl extends ServiceImpl implement
//拿到右表id
String rightId = taskJoinInfo.getRightId();
TaskInputInfo rightInput = taskInputService.findByNodeId(rightId);
- sql += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " +
+ leftJoin += leftInput.getTableName() + " " + leftInput.getTableAsName()+" " +
taskJoinInfo.getJoinType() + " " + rightInput.getTableName() + " " +
rightInput.getTableAsName() + " on " + leftInput.getTableAsName() + "." +
fieldAsNameMap.get(taskJoinInfo.getLeftJoinField()) + "=" + rightInput.getTableAsName() + "." +
@@ -149,21 +158,93 @@ public class TaskServiceImpl extends ServiceImpl implement
LambdaQueryWrapper selectOne = new LambdaQueryWrapper<>();
queryWrapper.eq(TaskInputInfo::getTaskId, taskId);
TaskInputInfo taskInputInfo = taskInputService.getOne(selectOne);
- sql += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName();
+ leftJoin += taskInputInfo.getTableName() + " " + taskInputInfo.getTableAsName();
+ }
+
+ String sqlCount = "select count(1) from "+leftJoin;
+ //查询出总条数
+ Integer count = dataValueFeign.findCount(Long.valueOf(taskOutputInfo.getBasicId()), sqlCount);
+ //查询数据
+ sql = sql + leftJoin;
+
+ int pageSize = 1000;
+ int totalSegments = (int) Math.ceil((double) count / pageSize);
+
+
+ LambdaQueryWrapper taskInfoQueryWrapper = new LambdaQueryWrapper<>();
+ taskInfoQueryWrapper.eq(TaskInfo::getId, taskId);
+ TaskInfo taskInfo = taskMapper.selectById(taskId);
+
+ Long basicId = Long.valueOf(taskOutputInfo.getBasicId());
+ Long tableId = Long.valueOf(taskOutputInfo.getTableId());
+ Integer weigh = taskInfo.getWeigh();
+ long firstArray = 0L;
+
+ PriorityBlockingQueue queue = new PriorityBlockingQueue<>(totalSegments);
+
+ // 创建线程池
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+
+
+ for (int i = 0; i < totalSegments; i++) {
+ log.info("调用第一次");
+ String limit = " limit "+ i*pageSize + ","+pageSize;
+ firstArray = count - (long) i *pageSize;
+ if (firstArray>=pageSize){
+ firstArray=pageSize;
+ }
+ String limitSelect = sql + limit;
+ SegmentTask segmentTask = new SegmentTask(limitSelect, tableId, basicId,newAndOldMap, Weight.getWeight(weigh));
+ queue.add(segmentTask);
+ }
+
+ // 启动线程池执行任务
+ for (int i = 0; i < 10; i++) { // 可以根据需要调整线程池大小
+ executor.submit(new Worker(queue));
+ }
+
+ executor.shutdown();
+ try {
+ // 等待所有任务完成
+ executor.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- log.info(sql);
- log.info(sql);
- log.info(sql);
- Result tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql);
- System.out.println(tableValue);
- System.out.println(tableValue);
- System.out.println(tableValue);
+// Result>> tableValue = dataValueFeign.findTableValue(Long.valueOf(taskOutputInfo.getBasicId()), sql);
return "success";
}
+ static class Worker implements Runnable {
+ private final PriorityBlockingQueue queue;
+
+ public Worker(PriorityBlockingQueue queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ // 从队列中取出一个任务
+ SegmentTask task = queue.take();
+ // 执行任务
+ task.run();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // 设置中断状态
+ break;
+ }
+ }
+ }
+
+ }
+
+ public void selectAndAdd(){
+
+ }
+
}
diff --git a/cloud-task-server/src/main/java/com/muyu/task/PriorityQueryExecutor.java b/cloud-task-server/src/main/java/com/muyu/task/PriorityQueryExecutor.java
new file mode 100644
index 0000000..661edd1
--- /dev/null
+++ b/cloud-task-server/src/main/java/com/muyu/task/PriorityQueryExecutor.java
@@ -0,0 +1,23 @@
+package com.muyu.task;
+
+
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @PackageName:com.muyu.task
+ * @ClassName:PriorityQueryExecutor
+ * @Description:
+ * @author: ¥陈思豪¥
+ * @date: 2024/9/6 10:31
+ */
+public class PriorityQueryExecutor {
+
+ private static final ExecutorService executor = Executors.newFixedThreadPool(50);
+
+ public void execute(SegmentTask task) {
+
+ }
+
+}
diff --git a/cloud-task-server/src/main/java/com/muyu/task/SegmentTask.java b/cloud-task-server/src/main/java/com/muyu/task/SegmentTask.java
new file mode 100644
index 0000000..0e9d353
--- /dev/null
+++ b/cloud-task-server/src/main/java/com/muyu/task/SegmentTask.java
@@ -0,0 +1,71 @@
+package com.muyu.task;
+
+/**
+ * @PackageName:com.muyu.task
+ * @ClassName:SementTask
+ * @Description:
+ * @author: ¥陈思豪¥
+ * @date: 2024/9/5 19:57
+ */
+
+import com.muyu.common.core.domain.Result;
+import com.muyu.domain.DataValue;
+import com.muyu.domain.taskenum.Weight;
+import com.muyu.task.feign.DataValueClient;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.HashMap;
+import java.util.List;
+
+@Log4j2
+public class SegmentTask implements Runnable {
+
+
+ @Autowired
+ private DataValueClient dataValueClient;
+
+ private final String sql;
+ private final Long basicId;
+ private final Long tableId;
+ private final Weight weight;
+ HashMap map = new HashMap<>();
+
+ public SegmentTask(String sql, Long basicId, Long tableId, HashMap map, Weight weight) {
+ this.sql = sql;
+ this.basicId = basicId;
+ this.tableId = tableId;
+ this.map=map;
+ this.weight = weight;
+ }
+
+ @Override
+ public void run() {
+ // 查询数据
+ Result>> tableValue = dataValueClient.findTableValue(Long.valueOf(basicId), sql);
+ List> data = tableValue.getData();
+ log.info("查询结果:" + data);
+ for (List datum : data) {
+ for (DataValue dataValue : datum) {
+ String key = dataValue.getKey();
+ String newKey = map.get(key);
+ dataValue.setKey(newKey);
+ }
+ }
+ log.info("开始添加:{}",data);
+
+ Result result = dataValueClient.addTableDataValue(basicId, tableId, data);
+ log.info("添加完毕字段:{}",result);
+ // 处理结果
+// dataValueList.accept(tableValue.getData());
+
+
+ }
+
+ public Weight geWeight() {
+ return weight;
+ }
+
+
+}
+