优化SQL拼接,节省内存,设置线程池长度为8
parent
b328617025
commit
19e80ad9b6
|
@ -29,7 +29,7 @@ import static java.lang.Thread.sleep;
|
|||
@Slf4j
|
||||
public final class TaskManager {
|
||||
// 线程池中默认线程的个数为5
|
||||
private static int workerNum = 5;
|
||||
private static int workerNum = 8;
|
||||
// 工作线程
|
||||
private final WorkThread[] workThrads;
|
||||
// 未处理的任务
|
||||
|
@ -44,7 +44,7 @@ public final class TaskManager {
|
|||
|
||||
// 创建具有默认线程个数的线程池
|
||||
private TaskManager() {
|
||||
this(5);
|
||||
this(8);
|
||||
}
|
||||
|
||||
// 创建线程池,workerNum为线程池中工作线程的个数
|
||||
|
|
|
@ -180,10 +180,13 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
|||
taskManager.execute(() -> {
|
||||
String exportCode = UUID.randomUUID().toString().replace("-","");
|
||||
// 获取新SQL 并执行
|
||||
String sql = findSql + " LIMIT "+limitNum+" OFFSET "+(index-1)*limitNum;
|
||||
String addSql = getAddSql(nodeMap, sql);
|
||||
int addSqlMaxLength = Math.min(addSql.length(), 30000);
|
||||
TaskExport entity = new TaskExport(taskCode,exportCode, addSql.substring(0,addSqlMaxLength), 0, "");
|
||||
StringBuilder newAddSql = new StringBuilder(findSql);
|
||||
newAddSql.append(" LIMIT ")
|
||||
.append(limitNum)
|
||||
.append(" OFFSET ")
|
||||
.append((index-1)*limitNum);
|
||||
String addSql = getAddSql(nodeMap, newAddSql.toString());
|
||||
TaskExport entity = new TaskExport(taskCode,exportCode, newAddSql.toString(), 0, "");
|
||||
taskExportService.save(entity);
|
||||
Result addResult = remoteDataSourceService.addTableValue(new DataValueModel(4L, addSql));
|
||||
log.info("任务 {} 第 {} 线程执行结果 {}",taskCode,index,addResult.getMsg());
|
||||
|
@ -197,9 +200,11 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
|||
taskExportService.updateByExportCode(entity);
|
||||
});
|
||||
}
|
||||
new Thread(() -> {
|
||||
taskManager.closed();
|
||||
taskManager = null;
|
||||
return "执行成功";
|
||||
}).start();
|
||||
return "任务启动成功!";
|
||||
}
|
||||
|
||||
|
||||
|
@ -256,6 +261,7 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
|||
*/
|
||||
private int getFindCount(String findSql, HashMap<String, List<Node>> nodeMap) {
|
||||
System.out.println(findSql);
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
String findCountSql = "";
|
||||
List<Node> uniteNodes = nodeMap.get("unite");
|
||||
findSql = findSql.replace(" "," ");
|
||||
|
@ -266,14 +272,17 @@ public class TaskServiceImpl extends ServiceImpl<TaskMapper, Task>
|
|||
}else {
|
||||
|
||||
String[] split = s1[s1.length-1].split("\\.");
|
||||
findCountSql = "SELECT TABLE_ROWS " +
|
||||
"FROM INFORMATION_SCHEMA.TABLES " +
|
||||
"WHERE TABLE_SCHEMA = "+split[0]+" AND TABLE_NAME = '"+split[1]+"';";
|
||||
findCountSql = findCountSql.replace("`","'");
|
||||
stringBuilder.append("SELECT TABLE_ROWS ")
|
||||
.append("FROM INFORMATION_SCHEMA.TABLES ")
|
||||
.append("WHERE TABLE_SCHEMA = ")
|
||||
.append(split[0])
|
||||
.append(" AND TABLE_NAME = '")
|
||||
.append(split[1])
|
||||
.append("';");
|
||||
findCountSql = stringBuilder.toString().replace("`","'");
|
||||
}
|
||||
System.out.println(findCountSql);
|
||||
Result tableValue = remoteDataSourceService.getTableValueTotal(new DataValueModel(4L, findCountSql));
|
||||
System.out.println(tableValue);
|
||||
log.info("表条数查询SQL: {},结果: {}",findCountSql, tableValue);
|
||||
|
||||
return (int) tableValue.getData();
|
||||
}
|
||||
|
|
|
@ -197,19 +197,20 @@ public class NodeUtils {
|
|||
NodeDisposition joinDataTo = dispMap.get("joinDataTo").get(0);
|
||||
// 查询表
|
||||
Object[] array = tableList.stream().map(NodeDisposition::getDispDesc).toArray();
|
||||
String table = array[0].toString() +
|
||||
" " +
|
||||
join.getDispValue() +
|
||||
" " +
|
||||
array[1].toString() +
|
||||
" ON " +
|
||||
joinDataForm.getDispDesc() +
|
||||
"." +
|
||||
joinDataForm.getDispValue() +
|
||||
" = " +
|
||||
joinDataTo.getDispDesc() +
|
||||
"." +
|
||||
joinDataTo.getDispValue();
|
||||
StringBuilder table = new StringBuilder(array[0].toString())
|
||||
.append(" ")
|
||||
.append(join.getDispValue())
|
||||
.append(" ")
|
||||
.append(array[1].toString())
|
||||
.append(" ON ")
|
||||
.append(joinDataForm.getDispDesc())
|
||||
.append(".")
|
||||
.append(joinDataForm.getDispValue())
|
||||
.append(" = ")
|
||||
.append(joinDataTo.getDispDesc())
|
||||
.append(".")
|
||||
.append(joinDataTo.getDispValue());
|
||||
|
||||
// 查询列
|
||||
String field = StringUtils.join(fieldList
|
||||
.stream()
|
||||
|
@ -217,14 +218,12 @@ public class NodeUtils {
|
|||
.toArray(),
|
||||
",");
|
||||
Set<Object> dbNameSet = dbList.stream().map(NodeDisposition::getDispValue).collect(Collectors.toSet());
|
||||
String dbTable = table.toString();
|
||||
for (Object o : dbNameSet) {
|
||||
table = StringUtils.replace(table, o.toString()+".", "`" + o.toString() + "`.");
|
||||
dbTable = StringUtils.replace(dbTable, o.toString()+".", "`" + o.toString() + "`.");
|
||||
field = StringUtils.replace(field, o.toString()+".", "`" + o.toString() + "`.");
|
||||
}
|
||||
sqlMap = new HashMap<>();
|
||||
sqlMap.put("dbTable",table);
|
||||
sqlMap.put("fields",field);
|
||||
return findSqlSplice(table, field);
|
||||
return new StringBuilder("SELECT ").append(field).append(" FORM ").append(dbTable).toString();
|
||||
}
|
||||
|
||||
|
||||
|
@ -247,27 +246,20 @@ public class NodeUtils {
|
|||
NodeDisposition db = dispMap.get("db").get(0);
|
||||
NodeDisposition table = dispMap.get("table").get(0);
|
||||
List<NodeDisposition> fieldList = dispMap.get("fields");
|
||||
String dbTable = "`"+db.getDispValue()+"`."+table.getDispValue();
|
||||
String fields = StringUtils.join(fieldList.stream().
|
||||
map(field -> dbTable + "." + field.getDispValue()).
|
||||
StringBuilder findSql = new StringBuilder("SELECT ");
|
||||
StringBuilder stringBuilder = new StringBuilder("`");
|
||||
stringBuilder.append(db.getDispValue())
|
||||
.append("`.")
|
||||
.append(table.getDispValue());
|
||||
findSql.append(StringUtils.join(fieldList.stream().
|
||||
map(field -> stringBuilder.toString() + "." + field.getDispValue()).
|
||||
toArray(),
|
||||
",");
|
||||
|
||||
sqlMap = new HashMap<>();
|
||||
sqlMap.put("dbTable",dbTable);
|
||||
sqlMap.put("fields",fields);
|
||||
return findSqlSplice(dbTable, fields);
|
||||
","))
|
||||
.append(" FROM ")
|
||||
.append(stringBuilder);
|
||||
return findSql.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询sql拼接
|
||||
* @param table 表名
|
||||
* @param fields 字段
|
||||
* @return sql语句
|
||||
*/
|
||||
private static String findSqlSplice(String table, String fields) {
|
||||
return "SELECT " + fields + " FROM " + table;
|
||||
}
|
||||
|
||||
/**
|
||||
* 输出节点 节点处理
|
||||
|
@ -294,12 +286,13 @@ public class NodeUtils {
|
|||
private static String nodeDispExportation(Map<String, List<NodeDisposition>> dispMap, List<List<DataModel>> data) {
|
||||
// 拼接新增表
|
||||
NodeDisposition db = dispMap.get("toDb").get(0);
|
||||
String dbTable = "`" + db.getDispDesc() + "`." + db.getDispValue();
|
||||
StringBuilder insSql = new StringBuilder("INSERT INTO ");
|
||||
insSql.append("`").append(db.getDispDesc()).append("`.").append(db.getDispValue());
|
||||
// 根据表结构拼接新增字段
|
||||
List<NodeDisposition> fieldList = dispMap.get("toFields");
|
||||
// 拼接新增语句的表与字段
|
||||
String join = StringUtils.join(fieldList.stream().map(NodeDisposition::getDispValue).toArray(), ",");
|
||||
StringBuilder insSql = new StringBuilder("INSERT INTO " + dbTable + "( " + join + " ) VALUES ");
|
||||
insSql.append("( ").append(join).append(" ) VALUES ");
|
||||
// 整理需新增数据
|
||||
List<HashMap<String, String>> dataList1 = new ArrayList<>();
|
||||
for (List<DataModel> datum : data) {
|
||||
|
|
Loading…
Reference in New Issue