master
面包骑士 2024-09-06 15:00:12 +08:00
parent b43749bb2c
commit 8757da2522
6 changed files with 75 additions and 29 deletions

View File

@ -22,13 +22,21 @@ import java.util.List;
*/
@FeignClient(contextId = "RemoteDataSourceService",
value = ServiceNameConstants.SOURCE_SERVICE,
url = "http://10.0.2.1:19652/",
fallbackFactory = RemoteDataSourceFactory.class)
public interface RemoteDataSourceService {
@PostMapping("/value/findTableValue")
public Result<List<DataModel>> findTableValue(@RequestBody DataValueModel dataValueModel);
/**
* IDSQL
*
* @param dataValueModel IDsql
* @return DataValue{kltv}
*/
@PostMapping("/value/getTableValueTotal")
public Result getTableValueTotal(@RequestBody DataValueModel dataValueModel);
/**
* IDSQL
*

View File

@ -22,6 +22,11 @@ public class RemoteDataSourceFactory implements FallbackFactory<RemoteDataSource
return Result.error("数据接入模块连接失败,网络异常...");
}
@Override
public Result getTableValueTotal(DataValueModel dataValueModel) {
return Result.error("数据接入模块连接失败,网络异常...");
}
@Override
public Result addTableValue(DataValueModel dataValueModel) {
return Result.error("数据接入模块连接失败,网络异常...");

View File

@ -136,4 +136,12 @@ public class TaskController extends BaseController
return success();
}
/**
*
*/
@PostMapping("/test")
public Result test() {
return success(taskService.test());
}
}

View File

@ -23,18 +23,19 @@ import java.util.List;
public final class TaskManager {
// 线程池中默认线程的个数为5
private static int workerNum = 10;
private static int workerNum = 5;
// 工作线程
private final WorkThread[] workThrads;
// 未处理的任务
private static volatile int finishedTask = 0;
// 任务队列,作为一个缓冲,List线程不安全
// 任务队列
private final List<Runnable> taskQueue = new LinkedList<Runnable>();
private static TaskManager taskManager;
// 创建具有默认线程个数的线程池
private TaskManager() {
this(5);
this(10);
}
// 创建线程池,workerNum为线程池中工作线程的个数
@ -64,12 +65,12 @@ public final class TaskManager {
return taskManager;
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定
// 把任务加入任务队列
public void execute(List<Runnable> task) {
execute(task.toArray(new Runnable[0]));
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行由线程池管理器决定
// 把任务加入任务队列
public void execute(Runnable... task) {
synchronized (taskQueue) {
Collections.addAll(taskQueue, task);
@ -132,7 +133,7 @@ public final class TaskManager {
@Override
public void run() {
Runnable r = null;
while (isRunning) {// 注意,若线程无效则自然结束run方法该线程就没用了
while (isRunning) {// 若线程无效则自然结束run方法该线程就没用了
synchronized (taskQueue) {
while (isRunning && taskQueue.isEmpty()) {// 队列为空
try {

View File

@ -48,4 +48,6 @@ public interface TaskService extends IService<Task> {
String execute(String taskCode);
String testExecute(List<Node> nodeList);
String test();
}

View File

@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.quest.manager.TaskManager;
import com.muyu.quest.model.DataModel;
import com.muyu.quest.model.DataValueModel;
import com.muyu.quest.domain.Node;
@ -49,6 +50,8 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
@Resource
private RemoteDataSourceService remoteDataSourceService;
private final static TaskManager taskManager = TaskManager.getTaskManager();
/**
*
*
@ -150,11 +153,26 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
String findSql = getFindSql(nodeMap);
// 获取查询条数
int count = getFindCount(findSql, nodeMap);
// 获取新增SQL
String addSql = getAddSql(nodeMap,findSql);
// 执行新增SQL
implAddSql(addSql);
// 划分线程 每次查询1000条
int threadNum = count / 1000 + 1;
log.info("任务 {} 总共需要 {} 条数据, 划分为线程{}条",taskCode,count,threadNum);
Runnable[] tasks = new Runnable[threadNum];
for (int i = 0; i < threadNum; i++) {
int index = i+1;
tasks[i] = () -> {
// 获取新SQL 并执行
String sql = findSql + " LIMIT 1000 OFFSET "+(index-1)*1000;
log.info("任务 {} 开始执行第 {} 线程,查询SQL: {}",taskCode,index, sql);
String addSql = getAddSql(nodeMap, sql);
log.info("任务 {} 开始执行第 {} 线程,新增SQL: {}",taskCode,index, addSql);
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
if (addResult.getCode() != 200){
throw new TaskException(addResult.getMsg());
}
};
}
// 添加进入任务队列
taskManager.execute(tasks);
return "执行成功";
}
@ -173,6 +191,14 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
return "测试成功,无异常";
}
@Override
public String test() {
String sql = "SELECT `cloud-ecology`.task_source.task_code,`cloud-ecology`.task_source.task_name,`cloud-ecology`.node_info.node_code,`cloud-ecology`.node_info.node_type,`cloud-ecology`.node_info.node_name FROM `cloud-ecology`.task_source RIGHT JOIN `cloud-ecology`.node_info ON `cloud-ecology`.task_source.task_code = `cloud-ecology`.node_info.task_code LIMIT 10 OFFSET 0";
Result<List<DataModel>> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, sql));
System.out.println(tableValue);
return "";
}
/**
* : map
*/
@ -212,19 +238,24 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
System.out.println(findSql);
String findCountSql = "";
List<Node> uniteNodes = nodeMap.get("unite");
findSql = findSql.replace(" "," ");
String[] s1 = findSql.split(" ");
if (uniteNodes!=null && !uniteNodes.isEmpty()){
String[] split = findSql.split(" ");
split[1] = "COUNT(" + split[1].split(",")[0]+ ")";
findCountSql = StringUtils.join(split," ");
s1[1] = "COUNT(" + s1[1].split(",")[0]+ ")";
findCountSql = StringUtils.join(s1," ");
}else {
String[] split = findSql.split("\\.");
String[] split = s1[s1.length-1].split("\\.");
findCountSql = "SELECT TABLE_ROWS " +
"FROM INFORMATION_SCHEMA.TABLES " +
"WHERE TABLE_SCHEMA = '"+split[split.length-2]+"' AND TABLE_NAME = '"+split[split.length-1]+"';";
"WHERE TABLE_SCHEMA = "+split[0]+" AND TABLE_NAME = '"+split[1]+"';";
findCountSql = findCountSql.replace("`","'");
}
Result<List<DataModel>> tableValue = remoteDataSourceService.findTableValue(new DataValueModel(4L, findCountSql));
System.out.println(findCountSql);
Result tableValue = remoteDataSourceService.getTableValueTotal(new DataValueModel(4L, findCountSql));
System.out.println(tableValue);
return 1;
return (int) tableValue.getData();
}
/**
@ -237,21 +268,12 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
}
List<DataModel> data = tableValue.getData();
List<NodeDisposition> dispList = getNodeDisp(nodeMap.get("exportation").get(0));
return NodeUtils.nodeDispExportation(dispList, data);
}
/**
* SQL
*/
private void implAddSql(String addSql) {
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
if (addResult.getCode() != 200){
throw new TaskException(addResult.getMsg());
}
}
// 查询节点配置信息
public List<NodeDisposition> getNodeDisp(Node node){
List<NodeDisposition> dispList = dispositionService