寻找报错
parent
0def068b03
commit
5c50d194be
|
@ -28,9 +28,7 @@ import org.springframework.stereotype.Service;
|
|||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.muyu.task.server.thread.OptimizedPrioritizedThreadPool.*;
|
||||
|
@ -165,13 +163,106 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
|
|||
String weight = taskInfo.getWeight();
|
||||
QueryWrapper<TaskInput> wrapper = new QueryWrapper<>();
|
||||
wrapper.eq("task_id", taskId);
|
||||
HashSet<Long> longs = new HashSet<>();
|
||||
List<TaskInput> list = taskInputService.list(wrapper);
|
||||
if (CollectionUtils.isEmpty(list)) {
|
||||
return "没有选择表";
|
||||
}
|
||||
HashMap<String, String> tableNameMap = new HashMap<>();
|
||||
String fieName = "";
|
||||
|
||||
Map<String, Object> fie = getFieAsNameAndFieEngineId(list);
|
||||
Map<String, String> fieAsName = (Map<String, String>) fie.get("FieAsName");
|
||||
Map<String, String> fieEngineId = (Map<String, String>) fie.get("FieEngineId");
|
||||
Set<Long> longs = (Set<Long>) fie.get("longs");
|
||||
if (longs.size() > 1) {
|
||||
return "你选择的不是同一个数据库";
|
||||
}
|
||||
|
||||
String joint = getJoint(taskId);
|
||||
Long basicId = taskInputService.selectByBasicId(taskId);
|
||||
QueryWrapper<TaskOutput> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("task_id", taskId);
|
||||
TaskOutput one = taskOutputService.getOne(queryWrapper);
|
||||
if (one == null) {
|
||||
return "没有选择输出";
|
||||
}
|
||||
Long tableId = one.getTableId();
|
||||
Long newBasicId = one.getBasicId();
|
||||
HashMap<String, String> map = new HashMap<>();
|
||||
String[] newFieName = one.getNewFieName().split(",");
|
||||
String[] lastFieName = one.getLastFieName().split(",");
|
||||
Integer num = lastFieName.length;
|
||||
for (int i = 0; i < newFieName.length; i++) {
|
||||
map.put(lastFieName[i], newFieName[i]);
|
||||
fieName += "," + fieAsName.get(lastFieName[i]);
|
||||
}
|
||||
fieName = fieName.substring(1);
|
||||
//查询表的数量
|
||||
Long count = getCount(joint, basicId);
|
||||
//查询和添加
|
||||
extracted(count, weight,fieName,joint, basicId, newBasicId, tableId, map, num);
|
||||
long end = System.currentTimeMillis();
|
||||
//log.info("执行时间:{}",end-start);
|
||||
return null;
|
||||
}
|
||||
|
||||
private Long getCount(String joint, Long basicId) {
|
||||
String sql = " SELECT count(1) FROM " + joint;
|
||||
Result<Long> countResult = datasourceFeign.findCount(basicId, sql);
|
||||
Long data = countResult.getData();
|
||||
return data;
|
||||
}
|
||||
|
||||
private void extracted(Long data, String weight,String finalFieName,String finalJoint, Long basicId, Long newBasicId, Long tableId, HashMap<String, String> map, Integer num) {
|
||||
long count = data / PAGE_SIZE + (data % PAGE_SIZE > 0 ? 1 : 0);
|
||||
for (long i = 1; i <= count; i++) {
|
||||
long pageNum = (i - 1) * PAGE_SIZE;
|
||||
long pageSize = Math.min(PAGE_SIZE, data - pageNum);
|
||||
if (Weight.high.getValue().equals(weight)) {
|
||||
log.info("执行高级任务");
|
||||
submitHighPriorityTask(() -> {
|
||||
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
|
||||
});
|
||||
|
||||
}
|
||||
if (Weight.centre.getValue().equals(weight)) {
|
||||
submitMediumPriorityTask(() -> {
|
||||
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
|
||||
});
|
||||
}
|
||||
if (Weight.low.getValue().equals(weight)) {
|
||||
|
||||
submitLowPriorityTask(() -> {
|
||||
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
|
||||
});
|
||||
|
||||
}
|
||||
if (Weight.urgency.getValue().equals(weight)) {
|
||||
log.info("执行紧急任务");
|
||||
// 调整线程分配以适应紧急任务
|
||||
OptimizedPrioritizedThreadPool.activeEmergencyTasks.set(0);
|
||||
OptimizedPrioritizedThreadPool.remainingTasks.set(0);
|
||||
submitEmergencyTask(() -> {
|
||||
try {
|
||||
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, pageSize, num);
|
||||
} finally {
|
||||
// 减少剩余任务计数
|
||||
if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) {
|
||||
System.out.println("All emergency tasks have completed.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@NotNull
|
||||
private Map<String, Object> getFieAsNameAndFieEngineId(List<TaskInput> list) {
|
||||
HashMap<String, String> tableNameMap = new HashMap<>();
|
||||
HashSet<Long> longs = new HashSet<>();
|
||||
HashMap<String, String> hashMap = new HashMap<>();
|
||||
for (int i = 0; i < list.size(); i++) {
|
||||
Long databaseId = list.get(i).getDatabaseId();
|
||||
|
@ -183,16 +274,21 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
|
|||
hashMap.put(tableFieId[j], fieIdAsEngineId[j]);
|
||||
tableNameMap.put(tableFieId[j], tableAsFieId[j]);
|
||||
}
|
||||
|
||||
}
|
||||
if (longs.size() > 1) {
|
||||
return "你选择的不是同一个数据库";
|
||||
}
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
map.put("FieAsName", tableNameMap);
|
||||
map.put("FieEngineId", hashMap);
|
||||
map.put("longs", longs);
|
||||
return map;
|
||||
}
|
||||
|
||||
|
||||
@NotNull
|
||||
private String getJoint(Long taskId) {
|
||||
String joint = "";
|
||||
QueryWrapper<NodeJoint> jointQueryWrapper = new QueryWrapper<>();
|
||||
jointQueryWrapper.eq("task_id", taskId);
|
||||
List<NodeJoint> jointList = nodeJointService.list(jointQueryWrapper);
|
||||
String joint = "";
|
||||
if (!CollectionUtils.isEmpty(jointList)) {
|
||||
for (NodeJoint nodeJoint : jointList) {
|
||||
String oneNodeId = nodeJoint.getOneNodeId();
|
||||
|
@ -214,113 +310,14 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
|
|||
|
||||
}
|
||||
} else {
|
||||
QueryWrapper<TaskInput> wrapper = new QueryWrapper<>();
|
||||
wrapper.eq("task_id", taskId);
|
||||
TaskInput taskInput = taskInputService.getOne(wrapper);
|
||||
String tableName = taskInput.getTableName();
|
||||
String tableAsName = taskInput.getTableAsName();
|
||||
joint = " " + tableName + " " + tableAsName;
|
||||
}
|
||||
Long basicId = taskInputService.selectByBasicId(taskId);
|
||||
|
||||
|
||||
QueryWrapper<TaskOutput> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("task_id", taskId);
|
||||
TaskOutput one = taskOutputService.getOne(queryWrapper);
|
||||
if (one==null){
|
||||
return "没有选择输出";
|
||||
}
|
||||
Long tableId = one.getTableId();
|
||||
Long newBasicId = one.getBasicId();
|
||||
HashMap<String, String> map = new HashMap<>();
|
||||
|
||||
String[] newFieName = one.getNewFieName().split(",");
|
||||
String[] lastFieName = one.getLastFieName().split(",");
|
||||
Integer num = lastFieName.length;
|
||||
for (int i = 0; i < newFieName.length; i++) {
|
||||
map.put(lastFieName[i], newFieName[i]);
|
||||
fieName += "," + tableNameMap.get(lastFieName[i]);
|
||||
}
|
||||
|
||||
fieName = fieName.substring(1);
|
||||
String sql = " SELECT count(1) FROM " + joint;
|
||||
Result<Long> countResult = datasourceFeign.findCount(basicId, sql);
|
||||
Long data = countResult.getData();
|
||||
String finalFieName = fieName;
|
||||
String finalJoint = joint;
|
||||
long count = data/PAGE_SIZE==0?1:data/PAGE_SIZE+1;
|
||||
long pageSize;
|
||||
if (Weight.high.getValue().equals(weight)){
|
||||
log.info("执行高级任务");
|
||||
for (long i = 1; i <= count; i++) {
|
||||
long pageNum = (i - 1) * PAGE_SIZE;
|
||||
pageSize = data - pageNum;
|
||||
if (pageSize>=PAGE_SIZE){
|
||||
pageSize=PAGE_SIZE;
|
||||
}
|
||||
long finalPageSize = pageSize;
|
||||
submitHighPriorityTask(()->{
|
||||
getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map, finalPageSize,num);
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
if (Weight.centre.getValue().equals(weight)){
|
||||
log.info("执行中级任务");
|
||||
for (long i = 1; i <= count; i++) {
|
||||
long pageNum = (i - 1) * PAGE_SIZE;
|
||||
pageSize = data - pageNum;
|
||||
if (pageSize>=PAGE_SIZE){
|
||||
pageSize=PAGE_SIZE;
|
||||
}
|
||||
long finalPageSize1 = pageSize;
|
||||
submitMediumPriorityTask(()->{
|
||||
getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map, finalPageSize1,num);
|
||||
});
|
||||
}
|
||||
}
|
||||
if (Weight.low.getValue().equals(weight)){
|
||||
log.info("执行低级任务");
|
||||
for (long i = 1; i <= count; i++) {
|
||||
long pageNum = (i - 1) * PAGE_SIZE;
|
||||
pageSize = data - pageNum;
|
||||
if (pageSize>=PAGE_SIZE){
|
||||
pageSize=PAGE_SIZE;
|
||||
}
|
||||
long finalPageSize2 = pageSize;
|
||||
submitLowPriorityTask(()->{
|
||||
getString(pageNum, finalFieName, finalJoint,basicId,newBasicId,tableId,map, finalPageSize2,num);
|
||||
});
|
||||
}
|
||||
}
|
||||
if (Weight.urgency.getValue().equals(weight)){
|
||||
log.info("执行紧急任务");
|
||||
// 调整线程分配以适应紧急任务
|
||||
OptimizedPrioritizedThreadPool.activeEmergencyTasks.set(0);
|
||||
OptimizedPrioritizedThreadPool.remainingTasks.set(0);
|
||||
for (long i = 1; i <= count; i++) {
|
||||
long pageNum = (i - 1) * PAGE_SIZE;
|
||||
pageSize = data - pageNum;
|
||||
if (pageSize>=PAGE_SIZE){
|
||||
pageSize=PAGE_SIZE;
|
||||
}
|
||||
long finalPageSize3 = pageSize;
|
||||
System.out.println(finalPageSize3);
|
||||
submitEmergencyTask(()->{
|
||||
try {
|
||||
getString(pageNum, finalFieName, finalJoint, basicId, newBasicId, tableId, map, finalPageSize3, num);
|
||||
} finally {
|
||||
// 减少剩余任务计数
|
||||
if (OptimizedPrioritizedThreadPool.remainingTasks.decrementAndGet() == 0) {
|
||||
System.out.println("All emergency tasks have completed.");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
//log.info("执行时间:{}",end-start);
|
||||
return null;
|
||||
return joint;
|
||||
}
|
||||
|
||||
|
||||
|
@ -331,24 +328,17 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
|
|||
Long basicId,
|
||||
Long newBasicId,
|
||||
Long tableId,
|
||||
HashMap<String, String> map ,
|
||||
HashMap<String, String> map,
|
||||
Long one,
|
||||
Integer two) {
|
||||
String sqlSelect = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +","+PAGE_SIZE;
|
||||
String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT "+ PAGE_SIZE +" OFFSET" + pageNum ;
|
||||
log.info(sqlSelect);
|
||||
|
||||
//log.info("执行{}查询的方法",sqlSelect);
|
||||
Result<DataValue[][]> tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect,one,two);
|
||||
Result<DataValue[][]> tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect, one, two);
|
||||
log.info(tableValueResult);
|
||||
DataValue[][] data = tableValueResult.getData();
|
||||
log.info("执行{}查询的方法结束",sqlSelect);
|
||||
// if (pageNum==160000){
|
||||
// for (DataValue[] datum : data) {
|
||||
// for (DataValue dataValue : datum) {
|
||||
// System.out.println(dataValue);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
log.info("执行{}查询的方法结束", sqlSelect);
|
||||
for (DataValue[] datum : data) {
|
||||
for (DataValue dataValue : datum) {
|
||||
String key = dataValue.getKey();
|
||||
|
@ -356,10 +346,10 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
|
|||
dataValue.setKey(newKey);
|
||||
}
|
||||
}
|
||||
log.info("{}查询结束",sqlSelect);
|
||||
log.info("执行{}添加的方法",sqlSelect);
|
||||
log.info("{}查询结束", sqlSelect);
|
||||
log.info("执行{}添加的方法", sqlSelect);
|
||||
Result result = datasourceFeign.addProduct(newBasicId, tableId, data);
|
||||
log.info("{}添加结束",result);
|
||||
log.info("{}添加结束", result);
|
||||
// for (List<DataValue> dataValues : tableValue) {
|
||||
// for (DataValue dataValue : dataValues) {
|
||||
// String key = dataValue.getKey();
|
||||
|
|
Loading…
Reference in New Issue