cloud-etl-task/muyu-quest-server/src/main/java/com/muyu/quest/utils/NodeUtils.java

344 lines
13 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.muyu.quest.utils;
/**
* @Author: 胡杨
* @Name: NodeUtils
* @Description: 节点处理工具
* @CreatedDate: 2024/9/1 下午4:27
* @FilePath: com.muyu.quest.utils
*/
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.etl.domain.DataStructure;
import com.muyu.etl.rule.remote.RemoteRuleVersion;
import com.muyu.quest.domain.Node;
import com.muyu.quest.domain.NodeDisposition;
import com.muyu.quest.domain.NodeType;
import com.muyu.quest.exception.TaskException;
import com.muyu.quest.model.DataModel;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Author: 胡杨
* @Name: NodeUtils
* @Description: 节点处理工具
* @CreatedDate: 2024/9/1 下午4:27
* @FilePath: com.muyu.quest.utils
*/
@Component
public class NodeUtils {
private static HashMap<String, String> sqlMap = new HashMap<>();
/**
* 节点初始化
* 将所有节点按节点类型分类
* 并判断任务流程是否完整
* @param nodes 节点列表
* @return 节点Map
*/
public static HashMap<String, List<Node>> nodeInit(List<Node> nodes){
if (nodes == null || nodes.isEmpty()){
throw new TaskException("节点列表为空");
}
HashMap<String, List<Node>> nodeMap = new HashMap<>();
// 整理所有节点
nodes.forEach(node -> {
List<Node> nodeList = nodeMap.get(node.getNodeType());
if (nodeList == null || nodeList.isEmpty()) {
nodeList = new ArrayList<>();
}
nodeList.add(node);
nodeMap.put(node.getNodeType(), nodeList);
});
return nodeMap;
}
/**
* 获取上级节点
* @param node 当前节点
* @param nodes 所有节点
* @return 下级节点列表
*/
public static Node getPreNode(Node node, List<Node> nodes){
if (nodes == null || nodes.isEmpty()){
throw new TaskException("节点列表为空");
}else if (node == null){
throw new TaskException("当前节点为空");
}
return nodes.stream()
.filter(nodeIndex -> StringUtils.equals(node.getNodePreCode() ,nodeIndex.getNodeCode()))
.toList().get(0);
}
/**
* 获取下级节点
* @param node 当前节点
* @param nodes 所有节点
* @return 下级节点列表
*/
public static List<Node> getNextNode(Node node, List<Node> nodes) {
if (nodes == null || nodes.isEmpty()) {
throw new TaskException("节点列表为空");
} else if (node == null) {
throw new TaskException("当前节点为空");
}
return nodes.stream()
.filter(nodeIndex ->
StringUtils.equals(node.getNodeCode(), nodeIndex.getNodePreCode()) ||
StringUtils.equals(node.getNodeNextCode(), nodeIndex.getNodeCode()))
.toList();
}
/**
* 任务节点组成检查
*
* @param nodeListAll 所有节点
* @param nodeTypeList 所有节点类型
*/
public static void nodeCheckMakeUp(List<Node> nodeListAll, List<NodeType> nodeTypeList) {
HashMap<String, List<Node>> nodeMapAll = nodeInit(nodeListAll);
nodeTypeList.forEach(nodeType -> {
// 根据节点类型查询对应类型节点
List<Node> nodes = nodeMapAll.get(nodeType.getNodeTypeCode());
Integer maxNum = nodeType.getNodeMaxNum();
Integer minNum = nodeType.getNodeMinNum();
if (nodes != null && !nodes.isEmpty()){
int num = nodes.size();
if (num < minNum){
throw new TaskException("节点 " + nodeType.getNodeTypeName() + " 数量不足,至少需要 " + minNum + " 个");
}else if (maxNum != -1 && num > maxNum){
throw new TaskException("节点 " + nodeType.getNodeTypeName() + " 数量超出范围,最多允许 " + maxNum + " 个");
}
}else if (minNum > 0){
throw new TaskException("节点 " + nodeType.getNodeTypeName() + " 数量不足,至少需要 " + minNum + " 个");
}
});
}
/**
* 校验任务流程是否符合节点规范
*
* @param nodeListAll 所有节点
* @param nodeTypeList 节点类型规范
*/
public static void nodeCheckNorm(List<Node> nodeListAll, List<NodeType> nodeTypeList) {
HashMap<String, List<Node>> nodeMapAll = nodeInit(nodeListAll);
nodeTypeList.forEach(nodeType -> {
// 根据节点类型查询对应类型节点
List<Node> nodes = nodeMapAll.get(nodeType.getNodeTypeCode());
if (nodes != null) {
nodes.forEach(node -> {
if (StringUtils.equals(node.getNodeType(), "exportation")){
Node nextNode = getPreNode(node, nodeListAll);
if (!StringUtils.equals(nextNode.getNodeType(), "table") &&
!StringUtils.equals(nextNode.getNodeType(), "unite")) {
throw new TaskException("数据输出节点必须紧跟在数据输入/操作节点之后");
}
}
// 获取该节点的下级节点
List<Node> nextNodeList = getNextNode(node, nodeListAll);
if (nextNodeList != null && !nextNodeList.isEmpty()){
for (Node nextNode : nextNodeList) {
// 判断其是否是允许的下级节点
if (!StringUtils.matches(nextNode.getNodeType(), nodeType.getNextNodeTypeCodeList())){
throw new TaskException("节点 " + node + " 连接不符合规范,该节点可有下级节点为: [" + nodeType.getNextNodeTypeCodeList() + "]");
}
}
}else if (!StringUtils.equals(node.getNodeType(), "end")){
throw new TaskException("节点 "+node+" 无下级节点.");
}
});
}
});
}
/**
* 查询节点的下一级节点
*/
public static Node nodeCheckNorm(Node node, List<Node> nodes) {
List<Node> nextNode = NodeUtils.getNextNode(node, nodes);
Set<String> nextNodeTypes = nextNode.stream().map(Node::getNodeType).collect(Collectors.toSet());
if (nextNode.isEmpty()){
throw new TaskException("任务执行失败,节点 "+node+" 无后续节点");
}else if (nextNodeTypes.size()>1){
throw new TaskException("同级节点 "+nextNode+" 类型不同");
}else if (nextNode.stream().map(Node::getNodeNextCode).collect(Collectors.toSet()).size()>1){
throw new TaskException("同级节点 "+nextNode+" 下级节点不同");
}
return nextNode.get(0);
}
/**
* 联合查询节点处理
* @param dispList 节点全部配置信息
* @return 查询sql
*/
public static String nodeDispUnite(List<NodeDisposition> dispList) {
return nodeDispUnite(DispUtils.getDispMap(dispList));
}
/**
* 联合查询节点处理
* @param dispMap 节点整理后的Map配置信息
* @return 查询sql
*/
public static String nodeDispUnite(Map<String,List<NodeDisposition>> dispMap) {
List<NodeDisposition> dbList = dispMap.get("db");
List<NodeDisposition> tableList = dispMap.get("table");
List<NodeDisposition> fieldList = dispMap.get("fields");
NodeDisposition join = dispMap.get("join").get(0);
NodeDisposition joinDataForm = dispMap.get("joinDataForm").get(0);
NodeDisposition joinDataTo = dispMap.get("joinDataTo").get(0);
// 查询表
Object[] array = tableList.stream().map(NodeDisposition::getDispDesc).toArray();
StringBuilder table = new StringBuilder(array[0].toString())
.append(" ")
.append(join.getDispValue())
.append(" ")
.append(array[1].toString())
.append(" ON ")
.append(joinDataForm.getDispDesc())
.append(".")
.append(joinDataForm.getDispValue())
.append(" = ")
.append(joinDataTo.getDispDesc())
.append(".")
.append(joinDataTo.getDispValue());
// 查询列
String field = StringUtils.join(fieldList
.stream()
.map(fields -> fields.getDispDesc()+"."+fields.getDispValue())
.toArray(),
",");
Set<Object> dbNameSet = dbList.stream().map(NodeDisposition::getDispValue).collect(Collectors.toSet());
String dbTable = table.toString();
for (Object o : dbNameSet) {
dbTable = StringUtils.replace(dbTable, o.toString()+".", "`" + o.toString() + "`.");
field = StringUtils.replace(field, o.toString()+".", "`" + o.toString() + "`.");
}
return new StringBuilder("SELECT ").append(field).append(" FORM ").append(dbTable).toString();
}
/**
* 表结构节点处理
* @param dispList 节点全部配置信息
* @return 查询sql
*/
public static String tableNode(List<NodeDisposition> dispList) {
return tableNode(DispUtils.getDispMap(dispList));
}
/**
* 表结构节点处理
* @param dispMap 节点整理后的Map配置信息
* @return 查询sql
*/
private static String tableNode(Map<String, List<NodeDisposition>> dispMap) {
NodeDisposition db = dispMap.get("db").get(0);
NodeDisposition table = dispMap.get("table").get(0);
List<NodeDisposition> fieldList = dispMap.get("fields");
StringBuilder findSql = new StringBuilder("SELECT ");
StringBuilder stringBuilder = new StringBuilder("`");
stringBuilder.append(db.getDispValue())
.append("`.")
.append(table.getDispValue());
findSql.append(StringUtils.join(fieldList.stream().
map(field -> stringBuilder.toString() + "." + field.getDispValue()).
toArray(),
","))
.append(" FROM ")
.append(stringBuilder);
return findSql.toString();
}
/**
* 查询sql拼接
* @param table 表名
* @param fields 字段
* @return sql语句
*/
private static String findSqlSplice(String table, String fields) {
return "SELECT " + fields + " FROM " + table;
}
/**
* 输出节点 节点处理
*
* @param dispList 节点全部配置信息
* @param data 查询到的数据
* @return 新增sql
*/
public static String nodeDispExportation(List<NodeDisposition> dispList, DataStructure[][] data) {
if (data == null){
throw new TaskException("查询数据为空");
}
return nodeDispExportation(DispUtils.getDispMap(dispList), data);
}
/**
* 输出节点 节点处理
*
* @param dispMap 节点整理后的Map配置信息
* @param data 查询到的数据
* @return 新增sql
*/
private static String nodeDispExportation(Map<String, List<NodeDisposition>> dispMap, DataStructure[][] data) {
// 拼接新增表
NodeDisposition db = dispMap.get("toDb").get(0);
StringBuilder insSql = new StringBuilder("INSERT INTO ");
insSql.append("`").append(db.getDispDesc()).append("`.").append(db.getDispValue());
// 根据表结构拼接新增字段
List<NodeDisposition> fieldList = dispMap.get("toFields");
// 拼接新增语句的表与字段
String join = StringUtils.join(fieldList.stream().map(NodeDisposition::getDispValue).toArray(), ",");
insSql.append("( ").append(join).append(" ) VALUES ");
// 整理需新增数据
List<HashMap<String, String>> dataList1 = new ArrayList<>();
for (DataStructure[] datum : data) {
HashMap<String, String> dataMap = new HashMap<>();
for (DataStructure dataModel : datum) {
// 检查 getValue 是否为空
String value = dataModel.getValue() != null ? dataModel.getValue().toString() : null;
// 规则执行 非法字符转换 单引号'和` =>
if (value!=null){
value = StringUtils.replace(value, "'", "");
value = StringUtils.replace(value, "`", "");
value = StringUtils.replace(value, "\\", "");
value = StringUtils.replace(value, " ", "");
}
dataMap.put(dataModel.getKey(), value);
}
dataList1.add(dataMap);
}
// 拼接新增语句的值
dataList1.forEach(map -> insSql.append("( ").
append(StringUtils.join(
fieldList.
stream().
map(field -> map.get(field.getDispDesc())).
map(field -> "'" + field + "'").
toArray(),
",")).append(" ),"));
return insSql.deleteCharAt(insSql.length() - 1).toString();
}
}