329 lines
12 KiB
Java
329 lines
12 KiB
Java
package com.muyu.quest.utils;
|
|
|
|
/**
|
|
* @Author: 胡杨
|
|
* @Name: NodeUtils
|
|
* @Description: 节点处理工具
|
|
* @CreatedDate: 2024/9/1 下午4:27
|
|
* @FilePath: com.muyu.quest.utils
|
|
*/
|
|
|
|
|
|
import com.muyu.common.core.utils.StringUtils;
|
|
import com.muyu.quest.domain.Node;
|
|
import com.muyu.quest.domain.NodeDisposition;
|
|
import com.muyu.quest.domain.NodeType;
|
|
import com.muyu.quest.exception.TaskException;
|
|
import com.muyu.quest.model.DataModel;
|
|
|
|
import java.util.*;
|
|
import java.util.stream.Collectors;
|
|
|
|
/**
|
|
* @Author: 胡杨
|
|
* @Name: NodeUtils
|
|
* @Description: 节点处理工具
|
|
* @CreatedDate: 2024/9/1 下午4:27
|
|
* @FilePath: com.muyu.quest.utils
|
|
*/
|
|
|
|
public class NodeUtils {
|
|
private static HashMap<String, String> sqlMap = new HashMap<>();
|
|
/**
|
|
* 节点初始化
|
|
* 将所有节点按节点类型分类
|
|
* 并判断任务流程是否完整
|
|
* @param nodes 节点列表
|
|
* @return 节点Map
|
|
*/
|
|
public static HashMap<String, List<Node>> nodeInit(List<Node> nodes){
|
|
if (nodes == null || nodes.isEmpty()){
|
|
throw new TaskException("节点列表为空");
|
|
}
|
|
HashMap<String, List<Node>> nodeMap = new HashMap<>();
|
|
// 整理所有节点
|
|
nodes.forEach(node -> {
|
|
List<Node> nodeList = nodeMap.get(node.getNodeType());
|
|
if (nodeList == null || nodeList.isEmpty()) {
|
|
nodeList = new ArrayList<>();
|
|
}
|
|
nodeList.add(node);
|
|
nodeMap.put(node.getNodeType(), nodeList);
|
|
});
|
|
return nodeMap;
|
|
}
|
|
|
|
/**
|
|
* 获取上级节点
|
|
* @param node 当前节点
|
|
* @param nodes 所有节点
|
|
* @return 下级节点列表
|
|
*/
|
|
public static Node getPreNode(Node node, List<Node> nodes){
|
|
if (nodes == null || nodes.isEmpty()){
|
|
throw new TaskException("节点列表为空");
|
|
}else if (node == null){
|
|
throw new TaskException("当前节点为空");
|
|
}
|
|
return nodes.stream()
|
|
.filter(nodeIndex -> StringUtils.equals(node.getNodePreCode() ,nodeIndex.getNodeCode()))
|
|
.toList().get(0);
|
|
}
|
|
|
|
/**
|
|
* 获取下级节点
|
|
* @param node 当前节点
|
|
* @param nodes 所有节点
|
|
* @return 下级节点列表
|
|
*/
|
|
public static List<Node> getNextNode(Node node, List<Node> nodes) {
|
|
if (nodes == null || nodes.isEmpty()) {
|
|
throw new TaskException("节点列表为空");
|
|
} else if (node == null) {
|
|
throw new TaskException("当前节点为空");
|
|
}
|
|
return nodes.stream()
|
|
.filter(nodeIndex ->
|
|
StringUtils.equals(node.getNodeCode(), nodeIndex.getNodePreCode()) ||
|
|
StringUtils.equals(node.getNodeNextCode(), nodeIndex.getNodeCode()))
|
|
.toList();
|
|
}
|
|
|
|
/**
|
|
* 任务节点组成检查
|
|
*
|
|
* @param nodeListAll 所有节点
|
|
* @param nodeTypeList 所有节点类型
|
|
*/
|
|
public static void nodeCheckMakeUp(List<Node> nodeListAll, List<NodeType> nodeTypeList) {
|
|
HashMap<String, List<Node>> nodeMapAll = nodeInit(nodeListAll);
|
|
nodeTypeList.forEach(nodeType -> {
|
|
// 根据节点类型查询对应类型节点
|
|
List<Node> nodes = nodeMapAll.get(nodeType.getNodeTypeCode());
|
|
|
|
Integer maxNum = nodeType.getNodeMaxNum();
|
|
Integer minNum = nodeType.getNodeMinNum();
|
|
if (nodes != null && !nodes.isEmpty()){
|
|
int num = nodes.size();
|
|
if (num < minNum){
|
|
throw new TaskException("节点 " + nodeType.getNodeTypeName() + " 数量不足,至少需要 " + minNum + " 个");
|
|
}else if (maxNum != -1 && num > maxNum){
|
|
throw new TaskException("节点 " + nodeType.getNodeTypeName() + " 数量超出范围,最多允许 " + maxNum + " 个");
|
|
}
|
|
}else if (minNum > 0){
|
|
throw new TaskException("节点 " + nodeType.getNodeTypeName() + " 数量不足,至少需要 " + minNum + " 个");
|
|
}
|
|
|
|
});
|
|
}
|
|
|
|
|
|
/**
|
|
* 校验任务流程是否符合节点规范
|
|
*
|
|
* @param nodeListAll 所有节点
|
|
* @param nodeTypeList 节点类型规范
|
|
*/
|
|
public static void nodeCheckNorm(List<Node> nodeListAll, List<NodeType> nodeTypeList) {
|
|
HashMap<String, List<Node>> nodeMapAll = nodeInit(nodeListAll);
|
|
nodeTypeList.forEach(nodeType -> {
|
|
// 根据节点类型查询对应类型节点
|
|
List<Node> nodes = nodeMapAll.get(nodeType.getNodeTypeCode());
|
|
if (nodes != null) {
|
|
nodes.forEach(node -> {
|
|
if (StringUtils.equals(node.getNodeType(), "exportation")){
|
|
Node nextNode = getPreNode(node, nodeListAll);
|
|
if (!StringUtils.equals(nextNode.getNodeType(), "table") &&
|
|
!StringUtils.equals(nextNode.getNodeType(), "unite")) {
|
|
throw new TaskException("数据输出节点必须紧跟在数据输入/操作节点之后");
|
|
}
|
|
}
|
|
// 获取该节点的下级节点
|
|
List<Node> nextNodeList = getNextNode(node, nodeListAll);
|
|
if (nextNodeList != null && !nextNodeList.isEmpty()){
|
|
for (Node nextNode : nextNodeList) {
|
|
// 判断其是否是允许的下级节点
|
|
if (!StringUtils.matches(nextNode.getNodeType(), nodeType.getNextNodeTypeCodeList())){
|
|
throw new TaskException("节点 " + node + " 连接不符合规范,该节点可有下级节点为: [" + nodeType.getNextNodeTypeCodeList() + "]");
|
|
}
|
|
}
|
|
}else if (!StringUtils.equals(node.getNodeType(), "end")){
|
|
throw new TaskException("节点 "+node+" 无下级节点.");
|
|
}
|
|
});
|
|
}
|
|
});
|
|
|
|
}
|
|
|
|
/**
|
|
* 查询节点的下一级节点
|
|
*/
|
|
public static Node nodeCheckNorm(Node node, List<Node> nodes) {
|
|
List<Node> nextNode = NodeUtils.getNextNode(node, nodes);
|
|
Set<String> nextNodeTypes = nextNode.stream().map(Node::getNodeType).collect(Collectors.toSet());
|
|
if (nextNode.isEmpty()){
|
|
throw new TaskException("任务执行失败,节点 "+node+" 无后续节点");
|
|
}else if (nextNodeTypes.size()>1){
|
|
throw new TaskException("同级节点 "+nextNode+" 类型不同");
|
|
}else if (nextNode.stream().map(Node::getNodeNextCode).collect(Collectors.toSet()).size()>1){
|
|
throw new TaskException("同级节点 "+nextNode+" 下级节点不同");
|
|
}
|
|
return nextNode.get(0);
|
|
}
|
|
|
|
|
|
/**
|
|
* 联合查询节点处理
|
|
* @param dispList 节点全部配置信息
|
|
* @return 查询sql
|
|
*/
|
|
public static String nodeDispUnite(List<NodeDisposition> dispList) {
|
|
return nodeDispUnite(DispUtils.getDispMap(dispList));
|
|
}
|
|
|
|
|
|
/**
|
|
* 联合查询节点处理
|
|
* @param dispMap 节点整理后的Map配置信息
|
|
* @return 查询sql
|
|
*/
|
|
public static String nodeDispUnite(Map<String,List<NodeDisposition>> dispMap) {
|
|
List<NodeDisposition> dbList = dispMap.get("db");
|
|
List<NodeDisposition> tableList = dispMap.get("table");
|
|
List<NodeDisposition> fieldList = dispMap.get("fields");
|
|
NodeDisposition join = dispMap.get("join").get(0);
|
|
NodeDisposition joinDataForm = dispMap.get("joinDataForm").get(0);
|
|
NodeDisposition joinDataTo = dispMap.get("joinDataTo").get(0);
|
|
// 查询表
|
|
Object[] array = tableList.stream().map(NodeDisposition::getDispDesc).toArray();
|
|
String table = array[0].toString() +
|
|
" " +
|
|
join.getDispValue() +
|
|
" " +
|
|
array[1].toString() +
|
|
" ON " +
|
|
joinDataForm.getDispDesc() +
|
|
"." +
|
|
joinDataForm.getDispValue() +
|
|
" = " +
|
|
joinDataTo.getDispDesc() +
|
|
"." +
|
|
joinDataTo.getDispValue();
|
|
// 查询列
|
|
String field = StringUtils.join(fieldList
|
|
.stream()
|
|
.map(fields -> fields.getDispDesc()+"."+fields.getDispValue())
|
|
.toArray(),
|
|
",");
|
|
Set<Object> dbNameSet = dbList.stream().map(NodeDisposition::getDispValue).collect(Collectors.toSet());
|
|
for (Object o : dbNameSet) {
|
|
table = StringUtils.replace(table, o.toString()+".", "`" + o.toString() + "`.");
|
|
field = StringUtils.replace(field, o.toString()+".", "`" + o.toString() + "`.");
|
|
}
|
|
sqlMap = new HashMap<>();
|
|
sqlMap.put("dbTable",table);
|
|
sqlMap.put("fields",field);
|
|
return findSqlSplice(table, field);
|
|
}
|
|
|
|
|
|
/**
|
|
* 表结构节点处理
|
|
* @param dispList 节点全部配置信息
|
|
* @return 查询sql
|
|
*/
|
|
public static String tableNode(List<NodeDisposition> dispList) {
|
|
return tableNode(DispUtils.getDispMap(dispList));
|
|
}
|
|
|
|
|
|
/**
|
|
* 表结构节点处理
|
|
* @param dispMap 节点整理后的Map配置信息
|
|
* @return 查询sql
|
|
*/
|
|
private static String tableNode(Map<String, List<NodeDisposition>> dispMap) {
|
|
NodeDisposition db = dispMap.get("db").get(0);
|
|
NodeDisposition table = dispMap.get("table").get(0);
|
|
List<NodeDisposition> fieldList = dispMap.get("fields");
|
|
String dbTable = "`"+db.getDispValue()+"`."+table.getDispValue();
|
|
String fields = StringUtils.join(fieldList.stream().
|
|
map(field -> dbTable + "." + field.getDispValue()).
|
|
toArray(),
|
|
",");
|
|
|
|
sqlMap = new HashMap<>();
|
|
sqlMap.put("dbTable",dbTable);
|
|
sqlMap.put("fields",fields);
|
|
return findSqlSplice(dbTable, fields);
|
|
}
|
|
|
|
/**
|
|
* 查询sql拼接
|
|
* @param table 表名
|
|
* @param fields 字段
|
|
* @return sql语句
|
|
*/
|
|
private static String findSqlSplice(String table, String fields) {
|
|
return "SELECT " + fields + " FROM " + table;
|
|
}
|
|
|
|
/**
|
|
* 输出节点 节点处理
|
|
*
|
|
* @param dispList 节点全部配置信息
|
|
* @param data 查询到的数据
|
|
* @return 新增sql
|
|
*/
|
|
public static String nodeDispExportation(List<NodeDisposition> dispList, List<List<DataModel>> data) {
|
|
if (data == null){
|
|
throw new TaskException("查询数据为空");
|
|
}
|
|
return nodeDispExportation(DispUtils.getDispMap(dispList), data);
|
|
}
|
|
|
|
|
|
/**
|
|
* 输出节点 节点处理
|
|
*
|
|
* @param dispMap 节点整理后的Map配置信息
|
|
* @param data 查询到的数据
|
|
* @return 新增sql
|
|
*/
|
|
private static String nodeDispExportation(Map<String, List<NodeDisposition>> dispMap, List<List<DataModel>> data) {
|
|
// 拼接新增表
|
|
NodeDisposition db = dispMap.get("toDb").get(0);
|
|
String dbTable = "`" + db.getDispDesc() + "`." + db.getDispValue();
|
|
// 根据表结构拼接新增字段
|
|
List<NodeDisposition> fieldList = dispMap.get("toFields");
|
|
// 拼接新增语句的表与字段
|
|
String join = StringUtils.join(fieldList.stream().map(NodeDisposition::getDispValue).toArray(), ",");
|
|
StringBuilder insSql = new StringBuilder("INSERT INTO " + dbTable + "( " + join + " ) VALUES ");
|
|
// 整理需新增数据
|
|
List<HashMap<String, String>> dataList1 = new ArrayList<>();
|
|
for (List<DataModel> datum : data) {
|
|
HashMap<String, String> dataMap = new HashMap<>();
|
|
datum.forEach(dataModel -> {
|
|
// 检查 getValue 是否为空
|
|
String value = dataModel.getValue() != null ? dataModel.getValue().toString() : null;
|
|
dataMap.put(dataModel.getKey(), value);
|
|
});
|
|
dataList1.add(dataMap);
|
|
}
|
|
System.out.println(dataList1);
|
|
// 拼接新增语句的值
|
|
dataList1.forEach(map -> insSql.append("( ").
|
|
append(StringUtils.join(
|
|
fieldList.
|
|
stream().
|
|
map(field -> map.get(field.getDispDesc())).
|
|
map(field -> "'" + field + "'").
|
|
toArray(),
|
|
",")).append(" ),"));
|
|
return insSql.deleteCharAt(insSql.length() - 1).toString();
|
|
}
|
|
|
|
|
|
}
|