diff --git a/srt-cloud-data-governance/src/main/resources/bootstrap.yml b/srt-cloud-data-governance/src/main/resources/bootstrap.yml index 97a4fb9..13ac274 100644 --- a/srt-cloud-data-governance/src/main/resources/bootstrap.yml +++ b/srt-cloud-data-governance/src/main/resources/bootstrap.yml @@ -15,7 +15,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 config: diff --git a/srt-cloud-data-integrate/src/main/resources/bootstrap.yml b/srt-cloud-data-integrate/src/main/resources/bootstrap.yml index a0f7c10..edd03e2 100644 --- a/srt-cloud-data-integrate/src/main/resources/bootstrap.yml +++ b/srt-cloud-data-integrate/src/main/resources/bootstrap.yml @@ -15,7 +15,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 config: diff --git a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar index 0595467..125a9ef 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar and b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar differ diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar index e848764..8c6d86f 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar differ diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar index e6aa6cd..f8ecd96 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar differ diff --git a/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml b/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml index 0154d40..2501a36 100644 --- a/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml +++ b/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml @@ -109,6 +109,9 @@ net.srt flink-function + + + diff --git a/srt-cloud-gateway/src/main/resources/bootstrap.yml b/srt-cloud-gateway/src/main/resources/bootstrap.yml index 79d8278..b53cfdb 100644 --- a/srt-cloud-gateway/src/main/resources/bootstrap.yml +++ b/srt-cloud-gateway/src/main/resources/bootstrap.yml @@ -84,18 +84,12 @@ spring: - Path=/srt-cloud-datax-service/** # Adjust the path as needed filters: - StripPrefix=1 - - id: srt-data-development # New Gateway - uri: lb://srt-data-development # Update with the correct URI for your new service - predicates: - - Path=/data-development/** # Adjust the path as needed - filters: - - StripPrefix=1 nacos: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 diff --git a/srt-cloud-module/srt-cloud-datax-service/src/main/resources/bootstrap.yml b/srt-cloud-module/srt-cloud-datax-service/src/main/resources/bootstrap.yml index 3d3ff59..4d5aac0 100644 --- a/srt-cloud-module/srt-cloud-datax-service/src/main/resources/bootstrap.yml +++ b/srt-cloud-module/srt-cloud-datax-service/src/main/resources/bootstrap.yml @@ -11,7 +11,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 config: diff --git a/srt-cloud-module/srt-cloud-datax/src/main/resources/bootstrap.yml b/srt-cloud-module/srt-cloud-datax/src/main/resources/bootstrap.yml index b80012b..3a1e9f5 100644 --- a/srt-cloud-module/srt-cloud-datax/src/main/resources/bootstrap.yml +++ b/srt-cloud-module/srt-cloud-datax/src/main/resources/bootstrap.yml @@ -11,7 +11,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 config: diff --git a/srt-cloud-module/srt-cloud-message/src/main/resources/bootstrap.yml b/srt-cloud-module/srt-cloud-message/src/main/resources/bootstrap.yml index bea8430..af0321c 100644 --- a/srt-cloud-module/srt-cloud-message/src/main/resources/bootstrap.yml +++ b/srt-cloud-module/srt-cloud-message/src/main/resources/bootstrap.yml @@ -11,7 +11,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 config: diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/resources/bootstrap.yml b/srt-cloud-module/srt-cloud-quartz/src/main/resources/bootstrap.yml index 24f7217..7de59f1 100644 --- a/srt-cloud-module/srt-cloud-quartz/src/main/resources/bootstrap.yml +++ b/srt-cloud-module/srt-cloud-quartz/src/main/resources/bootstrap.yml @@ -11,7 +11,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 config: diff --git a/srt-cloud-system/src/main/resources/bootstrap.yml b/srt-cloud-system/src/main/resources/bootstrap.yml index b6d5c86..aeb7ad8 100644 --- a/srt-cloud-system/src/main/resources/bootstrap.yml +++ b/srt-cloud-system/src/main/resources/bootstrap.yml @@ -14,7 +14,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 7e34f104-f333-4828-b36a-02146e521c9a + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 config: diff --git a/srt-data-development/pom.xml b/srt-data-development/pom.xml index ddc5fb1..9aa3b83 100644 --- a/srt-data-development/pom.xml +++ b/srt-data-development/pom.xml @@ -79,6 +79,12 @@ io.minio minio + + + net.srt + flink-core-all + 2.0.0 + diff --git a/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java new file mode 100644 index 0000000..d5fe5e0 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java @@ -0,0 +1,79 @@ +package net.srt.disposition.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import net.srt.disposition.dto.Flow; +import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.service.DataProductionScheduleService; +import net.srt.disposition.vo.DataProductionScheduleVo; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.common.utils.Result; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.parameters.P; +import org.springframework.web.bind.annotation.*; + +import javax.validation.Valid; +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.controller + * @Author: jpz + * @CreateTime: 2023/12/28 8:49 + */ +@RestController +@RequestMapping("schedule") +@Tag(name = "数据生产-作业调度") +@AllArgsConstructor +public class DataProductionScheduleController { + private final DataProductionScheduleService dataProductionScheduleService; + + @GetMapping("page") + @Operation(summary = "分页") + @PreAuthorize("hasAuthority('data-development:schedule:page')") + public Result> page(@Valid DataProductionsScheduleQuery query){ + PageResult pageResult = dataProductionScheduleService.page(query); + return Result.ok(pageResult); + } + + @GetMapping("{id}") + @Operation(summary = "信息") + @PreAuthorize("hasAuthority('data-development:schedule:info')") + public Result get(@PathVariable("id") Long id){ + return Result.ok(dataProductionScheduleService.get(id)); + } + @PostMapping + @Operation(summary = "保存") + @PreAuthorize("hasAuthority('data-development:schedule:save')") + public Result save(@RequestBody Flow flow){ + dataProductionScheduleService.save(flow); + return Result.ok(); + } + + @PostMapping("/run/{id}") + @Operation(summary = "执行(返回log的id)") + @PreAuthorize("hasAuthority('data-development:schedule:run')") + public Result run(@PathVariable Integer id){ + return Result.ok(dataProductionScheduleService.run(id)); + } + + @DeleteMapping + @Operation(summary = "删除") + @PreAuthorize("hasAuthority('data-development:schedule:delete')") + public Result delete(@RequestBody List idList){ + dataProductionScheduleService.delete(idList); + return Result.ok(); + } + + @PostMapping("/release/{id}") + @Operation(summary = "发布") + @PreAuthorize("hasAuthority('data-development:schedule:release')") + public Result release(@PathVariable Integer id) { + dataProductionScheduleService.release(id); + return Result.ok(); + } + + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionConvert.java b/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionConvert.java new file mode 100644 index 0000000..9239746 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionConvert.java @@ -0,0 +1,22 @@ +package net.srt.disposition.convert; + +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.disposition.vo.DataProductionScheduleVo; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.convert + * @Author: jpz + * @CreateTime: 2023/12/28 9:56 + */ +@Mapper +public interface DataProductionConvert { + + DataProductionConvert INSTANCE = Mappers.getMapper(DataProductionConvert.class); + + List convertList(List list); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/Flow.java b/srt-data-development/src/main/java/net/srt/disposition/dto/Flow.java new file mode 100644 index 0000000..6bea4c7 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/Flow.java @@ -0,0 +1,31 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:04 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Flow { + private Long id; + private Integer recordId; + private Integer ifCycle; + private String name; + private String cron; + private String note; + private Integer status; + private Date releaseTime; + private Integer releaseUserId; + private List nodes; + private List edges; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/FlowEdge.java b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowEdge.java new file mode 100644 index 0000000..2ddb1b9 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowEdge.java @@ -0,0 +1,28 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:04 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowEdge { + private String id; + private String type; + private Map startPoint; + private Map endPoint; + private List> pointsList; + private String sourceNodeId; + private String targetNodeId; + private Map properties; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNode.java b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNode.java new file mode 100644 index 0000000..eb9a407 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNode.java @@ -0,0 +1,22 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:04 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowNode { + private String id; + private String type; + private Integer x; + private Integer y; + private FlowNodeProperties properties; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNodeProperties.java b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNodeProperties.java new file mode 100644 index 0000000..8571057 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNodeProperties.java @@ -0,0 +1,37 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:05 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowNodeProperties { + public static final Map SUCCESS_STYLE = new HashMap() { + { + put("border","3px solid #06c733"); + } + }; + private Long id; + private Integer nodeRecordId; + private String name; + private Integer taskId; + private Integer weight; + private Integer taskType; + private String taskTypeVal; + private String note; + private Integer failGoOn; + private Map style; + //CommonRunStatus + private Integer runStatus = 1; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleEntity.java new file mode 100644 index 0000000..427edbd --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleEntity.java @@ -0,0 +1,38 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 9:35 + */ +@EqualsAndHashCode(callSuper = false) +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +@TableName(value = "data_production_schedule",autoResultMap = true) +public class DataProductionScheduleEntity extends BaseEntity { + private Long projectId; + private String name; + private Integer ifCycle; + private String cron; + private String note; + private Integer status; + private Date releaseTime; + private Integer releaseStatus; + private String edges; + private Integer releaseUserId; + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeEntity.java new file mode 100644 index 0000000..8208015 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeEntity.java @@ -0,0 +1,76 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import net.srt.framework.mybatis.entity.BaseEntity; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 15:01 + */ +@Data +@TableName("data_production_schedule_node") +public class DataProductionScheduleNodeEntity extends BaseEntity { + private Long projectId; + /** + * 关联的调度id + */ + private Long taskScheduleId; + + /** + * 节点编号 + */ + private String no; + + /** + * 执行顺序 + */ + private Integer sort; + + /** + * 节点名称 + */ + private String name; + + /** + * 节点类型 + */ + private String type; + + /** + * 横坐标 + */ + private Integer x; + + /** + * 纵坐标 + */ + private Integer y; + + /** + * 节点描述 + */ + private String note; + + /** + * 关联的作业id + */ + private Integer taskId; + + /** + * 作业类型 + */ + private Integer taskType; + + /** + * 遇错是否继续 + */ + private Integer failGoOn; + + /** + * 权重 + */ + private Integer weight; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeRecordEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeRecordEntity.java new file mode 100644 index 0000000..9efd6d6 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeRecordEntity.java @@ -0,0 +1,71 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 19:06 + */ +@Data +@EqualsAndHashCode(callSuper = false) +@TableName("data_production_schedule_node_record") +public class DataProductionScheduleNodeRecordEntity extends BaseEntity { + public static final String SCHEDULE_NODE_RECORD = "SCHEDULE_NODE_RECORD"; + /** + * 调度id + */ + private Integer taskScheduleId; + + /** + * 调度节点id + */ + private Integer scheduleNodeId; + + /** + * 调度记录id + */ + private Integer scheduleRecordId; + + private String scheduleNodeNo; + + /** + * 作业id + */ + private Integer taskId; + + /** + * 项目(租户)id + */ + private Long projectId; + + /** + * 当前状态 字典 run_status + */ + private Integer runStatus; + + /** + * 开始时间 + */ + private Date startTime; + + /** + * 结束时间 + */ + private Date endTime; + + /** + * 运行日志 + */ + private String log; + + private Integer executeType; + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleRecordEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleRecordEntity.java new file mode 100644 index 0000000..69beca6 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleRecordEntity.java @@ -0,0 +1,58 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 17:26 + */ +@EqualsAndHashCode(callSuper = false) +@Data +@TableName("data_production_schedule_record") +public class DataProductionScheduleRecordEntity extends BaseEntity { + public static final String SCHEDULE_RECORD = "SCHEDULE_RECORD"; + //程序异常中断重新执行的情况 + public static final String SCHEDULE_RESTART_RECORD = "SCHEDULE_RESTART_RECORD"; + private String name; + /** + * 调度id + */ + private Integer taskScheduleId; + + /** + * 项目(租户)id + */ + private Long projectId; + + /** + * 当前状态 字典 run_status + */ + private Integer runStatus; + + /** + * 开始时间 + */ + private Date startTime; + + /** + * 结束时间 + */ + private Date endTime; + + /** + * 运行日志 + */ + private String log; + + private Integer executeType; + + private String configJson; + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionTaskEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionTaskEntity.java new file mode 100644 index 0000000..7f4c583 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionTaskEntity.java @@ -0,0 +1,227 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.FieldStrategy; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.flink.common.assertion.Asserts; +import net.srt.flink.core.job.JobConfig; +import net.srt.flink.gateway.GatewayType; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 19:36 + */ +@EqualsAndHashCode(callSuper = false) +@Data +@TableName(value = "data_production_task", autoResultMap = true) +public class DataProductionTaskEntity extends BaseEntity { + /** + * 节点id + */ + private Long catalogueId; + + /** + * 任务名称 + */ + private String name; + + /** + * 项目(租户id) + */ + private Long projectId; + + /** + * 任务别名 + */ + private String alias; + + /** + * 任务类型 + */ + private Integer dialect; + + /** + * 任务运行类型 + */ + private Integer type; + + /** + * CheckPoint trigger seconds + */ + private Integer checkPoint; + + /** + * SavePoint strategy + */ + private Integer savePointStrategy; + + /** + * SavePointPath + */ + private String savePointPath; + + /** + * 并行度 + */ + private Integer parallelism; + + /** + * 全局变量 + */ + private Boolean fragment; + + /** + * insert 语句集 + */ + private Boolean statementSet; + + /** + * 批处理模式 + */ + private Boolean batchModel; + + /** + * flink集群实例id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long clusterId; + + /** + * 集群配置id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long clusterConfigurationId; + + /** + * 数据类型(1-数据库 2-中台库)(sql模式下) + */ + private Integer sqlDbType; + + /** + * 数据库id(sql模式下) + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long databaseId; + + private Integer openTrans; + + /** + * Jar ID + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long jarId; + + /** + * env id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long envId; + + /** + * alert group id + */ + private Long alertGroupId; + + /** + * configuration json + */ + private String configJson; + + /** + * Job Note + */ + private String note; + + /** + * Job lifecycle + */ + private Integer step; + + /** + * job instance id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long jobInstanceId; + + /** + * 自动停止 + */ + private Boolean useAutoCancel; + + /** + * 打印流 + */ + private Boolean useChangeLog; + + /** + * 预览结果 + */ + private Boolean useResult; + + /** + * 预览行数 + */ + private Integer pvdataNum; + + /** + * is enable + */ + private Boolean enabled; + + /** + * version id + */ + private Integer versionId; + + @TableField(exist = false) + private String statement; + + @TableField(exist = false) + private String clusterName; + + @TableField(exist = false) + private List> config = new ArrayList<>(); + + + public List> parseConfig() { + ObjectMapper objectMapper = new ObjectMapper(); + try { + if (Asserts.isNotNullString(configJson)) { + config = objectMapper.readValue(configJson, ArrayList.class); + } + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + return config; + } + + public JobConfig buildSubmitConfig() { + boolean useRemote = true; + if (clusterId == null || clusterId == 0) { + useRemote = false; + } + Map map = new HashMap<>(); + for (Map item : config) { + if (Asserts.isNotNull(item)) { + map.put(item.get("key"), item.get("value")); + } + } + int jid = Asserts.isNull(jarId) ? 0 : jarId.intValue(); + boolean fg = Asserts.isNull(fragment) ? false : fragment; + boolean sts = Asserts.isNull(statementSet) ? false : statementSet; + return new JobConfig(GatewayType.getByCode(type.toString()).getLongValue(), step, false, false, useRemote, clusterId == null ? null : clusterId.intValue(), clusterConfigurationId == null ? null : clusterConfigurationId.intValue(), jid, getId().intValue(), + alias, fg, sts, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map); + } +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleDao.java new file mode 100644 index 0000000..b5d31d1 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleDao.java @@ -0,0 +1,17 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.mapper + * @Author: jpz + * @CreateTime: 2023/12/28 9:45 + */ +@Mapper +public interface DataProductionScheduleDao extends BaseDao { + + void changeStutus(DataProductionScheduleEntity dbEntity); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeDao.java new file mode 100644 index 0000000..5916bdb --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeDao.java @@ -0,0 +1,15 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleNodeEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dao + * @Author: jpz + * @CreateTime: 2023/12/28 15:00 + */ +@Mapper +public interface DataProductionScheduleNodeDao extends BaseDao { +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeRecordDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeRecordDao.java new file mode 100644 index 0000000..a468772 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeRecordDao.java @@ -0,0 +1,15 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dao + * @Author: jpz + * @CreateTime: 2023/12/28 19:08 + */ +@Mapper +public interface DataProductionScheduleNodeRecordDao extends BaseDao { +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleRecordDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleRecordDao.java new file mode 100644 index 0000000..6e083e8 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleRecordDao.java @@ -0,0 +1,15 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleRecordEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dao + * @Author: jpz + * @CreateTime: 2023/12/28 18:54 + */ +@Mapper +public interface DataProductionScheduleRecordDao extends BaseDao { +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionsScheduleQuery.java b/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionsScheduleQuery.java new file mode 100644 index 0000000..bb618d6 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionsScheduleQuery.java @@ -0,0 +1,20 @@ +package net.srt.disposition.query; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.common.query.Query; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.query + * @Author: jpz + * @CreateTime: 2023/12/28 9:39 + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Schema(description = "数据生成-任务调度查询") +public class DataProductionsScheduleQuery extends Query { + private String name; + private Integer status; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java new file mode 100644 index 0000000..8f5771d --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java @@ -0,0 +1,32 @@ +package net.srt.disposition.service; + +import net.srt.disposition.dto.Flow; +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.vo.DataProductionScheduleVo; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.mybatis.service.BaseService; + +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service + * @Author: jpz + * @CreateTime: 2023/12/28 8:56 + */ + +public interface DataProductionScheduleService extends BaseService { + PageResult page(DataProductionsScheduleQuery query); + + Flow get(Long id); + + void save(Flow flow); + + String run(Integer id); + + + void delete(List idList); + + void release(Integer id); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionTaskService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionTaskService.java new file mode 100644 index 0000000..2bc200b --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionTaskService.java @@ -0,0 +1,17 @@ +package net.srt.disposition.service; + +import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity; +import net.srt.disposition.entity.DataProductionTaskEntity; +import net.srt.flink.core.job.JobResult; +import net.srt.framework.mybatis.service.BaseService; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service + * @Author: jpz + * @CreateTime: 2023/12/28 19:36 + */ +public interface DataProductionTaskService extends BaseService { + JobResult scheduleTask(DataProductionScheduleNodeRecordEntity nodeRecordEntity); + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java new file mode 100644 index 0000000..8233a1a --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java @@ -0,0 +1,365 @@ +package net.srt.disposition.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.AllArgsConstructor; +import net.srt.api.module.data.development.constant.ExecuteType; +import net.srt.api.module.data.integrate.constant.CommonRunStatus; +import net.srt.api.module.quartz.QuartzDataProductionScheduleApi; +import net.srt.disposition.convert.DataProductionConvert; +import net.srt.disposition.mapper.DataProductionScheduleNodeDao; +import net.srt.disposition.mapper.DataProductionScheduleNodeRecordDao; +import net.srt.disposition.mapper.DataProductionScheduleRecordDao; +import net.srt.disposition.dto.Flow; +import net.srt.disposition.dto.FlowEdge; +import net.srt.disposition.dto.FlowNode; +import net.srt.disposition.dto.FlowNodeProperties; +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.disposition.entity.DataProductionScheduleNodeEntity; +import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity; +import net.srt.disposition.entity.DataProductionScheduleRecordEntity; +import net.srt.disposition.mapper.DataProductionScheduleDao; +import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.service.DataProductionScheduleService; +import net.srt.disposition.vo.DataProductionScheduleVo; +import net.srt.flink.common.config.Dialect; +import net.srt.flink.common.utils.JSONUtil; +import net.srt.flink.common.utils.ThreadUtil; +import net.srt.flink.core.job.JobResult; +import net.srt.flink.process.context.ProcessContextHolder; +import net.srt.flink.process.model.ProcessEntity; +import net.srt.flink.process.model.ProcessType; +import net.srt.flink.process.pool.ConsolePool; +import net.srt.framework.common.exception.ServerException; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.mybatis.service.impl.BaseServiceImpl; +import net.srt.framework.security.user.SecurityUser; +import org.apache.logging.log4j.core.util.CronExpression; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import srt.cloud.framework.dbswitch.common.util.StringUtil; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service.impl + * @Author: jpz + * @CreateTime: 2023/12/28 9:42 + */ +@Service +@AllArgsConstructor +public class DataProductionScheduleServiceimpl extends BaseServiceImpl implements DataProductionScheduleService { + private final DataProductionScheduleNodeDao nodeDao; + private final DataProductionScheduleRecordDao recordDao; + private final DataProductionScheduleNodeRecordDao nodeRecordDao; + private final QuartzDataProductionScheduleApi scheduleApi; + @Override + public PageResult page(DataProductionsScheduleQuery query) { + IPage page=baseMapper.selectPage(getPage(query),getWrapper(query)); + return new PageResult<>(DataProductionConvert.INSTANCE.convertList(page.getRecords()),page.getTotal()); + } + + @Override + public Flow get(Long id) { + DataProductionScheduleEntity entity = baseMapper.selectById(id); + Flow flow = new Flow(); + flow.setId(entity.getId()); + flow.setName(entity.getName()); + flow.setCron(entity.getCron()); + flow.setStatus(entity.getStatus()); + flow.setReleaseUserId(entity.getReleaseStatus()); + flow.setReleaseTime(entity.getReleaseTime()); + flow.setIfCycle(entity.getIfCycle()); + flow.setNote(entity.getNote()); + flow.setEdges(JSONUtil.parseObject(entity.getEdges(),new TypeReference>(){ + })); + ArrayList nodes = new ArrayList<>(6); + flow.setNodes(nodes); + //获取结点 + HashMap queryMap = new HashMap<>(); + queryMap.put("task_schedule_id",id); + List dbNodes=nodeDao.selectByMap(queryMap); + for (DataProductionScheduleNodeEntity dbNode : dbNodes) { + FlowNode flowNode=getFlowNode(dbNode); + nodes.add(flowNode); + } + return flow; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void save(Flow flow) { + insertOrUpdate(flow); + } + + @Override + public String run(Integer id) { + return scheduleRun(id, ExecuteType.HAND); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void delete(List idList) { + removeByIds(idList); + for (Long id : idList) { + //同步删除节点 + Map delMap = new HashMap<>(); + delMap.put("task_schedule_id", id); + nodeDao.deleteByMap(delMap); + } + } + + @Override + public void release(Integer id) { + scheduleApi.release(id.longValue()); + //更新状态,发布人和发布时间 + DataProductionScheduleEntity dbEntity = new DataProductionScheduleEntity(); + dbEntity.setId(id.longValue()); + dbEntity.setStatus(1); + dbEntity.setReleaseUserId(SecurityUser.getUserId().intValue()); + dbEntity.setReleaseTime(new Date()); + baseMapper.changeStutus(dbEntity); + } + + + private String scheduleRun(Integer id, ExecuteType executeType) { + DataProductionScheduleEntity scheduleEntity=baseMapper.selectById(id); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(DataProductionScheduleNodeEntity::getTaskScheduleId,id) + .orderByAsc(DataProductionScheduleNodeEntity::getSort); + List dbNodes=nodeDao.selectList(wrapper); + //新增调度日志 + DataProductionScheduleRecordEntity scheduleRecordEntity = new DataProductionScheduleRecordEntity(); + scheduleRecordEntity.setProjectId(scheduleRecordEntity.getProjectId()); + scheduleRecordEntity.setName(scheduleEntity.getName()); + scheduleRecordEntity.setTaskScheduleId(id); + scheduleRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode()); + scheduleRecordEntity.setStartTime(new Date()); + scheduleRecordEntity.setExecuteType(executeType.getValue()); + recordDao.insert(scheduleRecordEntity); + ThreadUtil.threadPool.execute(()->runNode(scheduleEntity,executeType,dbNodes,scheduleRecordEntity)); + return scheduleEntity.getId().toString(); + } + + private void runNode(DataProductionScheduleEntity scheduleEntity, ExecuteType executeType, List dbNodes, DataProductionScheduleRecordEntity scheduleRecordEntity) { + String processId=scheduleRecordEntity.getId()+DataProductionScheduleRecordEntity.SCHEDULE_RECORD; + ProcessEntity flowProcess= ProcessContextHolder.registerFlowProcess(ProcessEntity.init(ProcessType.FLINKEXECUTE,processId)); + flowProcess.info("Start run flow..."); + boolean recordSuccess = true; + List flowNodes = dbNodes.stream().map(this::getFlowNode).collect(Collectors.toList()); + for (DataProductionScheduleNodeEntity dbNode : dbNodes) { + //返回给前台的节点 + FlowNode flowNode = flowNodes.stream().filter(item -> item.getId().equals(dbNode.getNo())).findFirst().get(); + flowNode.getProperties().setRunStatus(CommonRunStatus.SUCCESS.getCode()); + flowNode.getProperties().setStyle(FlowNodeProperties.SUCCESS_STYLE); + flowProcess.info(String.format("Start run node %s-%s", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue())); + DataProductionScheduleNodeRecordEntity nodeRecordEntity = new DataProductionScheduleNodeRecordEntity(); + nodeRecordEntity.setProjectId(scheduleRecordEntity.getProjectId()); + nodeRecordEntity.setTaskScheduleId(scheduleEntity.getId().intValue()); + nodeRecordEntity.setScheduleNodeId(dbNode.getId().intValue()); + nodeRecordEntity.setScheduleNodeNo(dbNode.getNo()); + nodeRecordEntity.setScheduleRecordId(scheduleRecordEntity.getId().intValue()); + nodeRecordEntity.setTaskId(dbNode.getTaskId()); + nodeRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode()); + nodeRecordEntity.setStartTime(new Date()); + nodeRecordEntity.setExecuteType(executeType.getValue()); + nodeRecordDao.insert(nodeRecordEntity); + flowNode.getProperties().setNodeRecordId(nodeRecordEntity.getId().intValue()); + JobResult jobResult = null; + if (jobResult != null) { + flowProcess.info(jobResult.getLog()); + } + flowProcess.info(String.format("Node %s-%s run end", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue())); + } + //更新调度记录 + scheduleRecordEntity.setEndTime(new Date()); + scheduleRecordEntity.setRunStatus(recordSuccess ? CommonRunStatus.SUCCESS.getCode() : CommonRunStatus.FAILED.getCode()); + scheduleRecordEntity.setLog(ConsolePool.getInstance().get(processId).toString()); + //保存历史json + Flow flow = new Flow(); + flow.setId(scheduleEntity.getId()); + flow.setIfCycle(scheduleEntity.getIfCycle()); + flow.setRecordId(scheduleRecordEntity.getId().intValue()); + flow.setName(scheduleEntity.getName()); + flow.setNote(scheduleEntity.getNote()); + flow.setCron(scheduleEntity.getCron()); + flow.setNodes(flowNodes); + flow.setEdges(JSONUtil.parseObject(scheduleEntity.getEdges(), new TypeReference>() { + })); + scheduleRecordEntity.setConfigJson(JSONUtil.toJsonString(flow)); + recordDao.updateById(scheduleRecordEntity); + flowProcess.infoEnd(); + //移除日志 + ProcessContextHolder.clearFlow(); + ConsolePool.getInstance().remove(processId); + } + + private void insertOrUpdate(Flow flow) { + if (flow.getIfCycle()==1&&!CronExpression.isValidExpression(flow.getCron())){ + throw new ServerException("cron表达式有误,请检查后重新填写"); + } + DataProductionScheduleEntity entity=DataProductionScheduleEntity.builder().id(flow.getId()) + .projectId(getProjectId()).ifCycle(flow.getIfCycle()) + .name(flow.getName()).cron(flow.getCron()).note(flow.getNote()).status(flow.getStatus()).releaseTime(flow.getReleaseTime()).releaseUserId(flow.getReleaseUserId()).build(); + List nodes=flow.getNodes(); + List edges=flow.getEdges(); + //寻找入度为0节点 + List starNodes=getStarNodes(nodes,edges); + //检查闭环 + checkClosedLoop(starNodes,nodes,edges); + Set runNodeSet=new LinkedHashSet<>(); + buildRunNodes(runNodeSet,starNodes,nodes,edges); + if (entity.getId()==null){ + entity.setEdges(JSONUtil.toJsonString(edges)); + baseMapper.insert(entity); + //转换前端传过来节点为entity + List clientNodes=getNodesByNodeSet(entity,runNodeSet); + //新增节点 + clientNodes.forEach(nodeDao::insert); + }else { + List clientNodes=getNodesByNodeSet(entity,runNodeSet); + entity.setEdges(JSONUtil.toJsonString(edges)); + baseMapper.updateById(entity); + //获取库中节点 + HashMap queryMap = new HashMap<>(); + queryMap.put("task-schedule_id",entity.getId()); + List dbNode=nodeDao.selectByMap(queryMap); + //查询clientNodes的properties的id + List insertNodes=clientNodes.stream().filter(item->item.getId()==null).collect(Collectors.toList()); + insertNodes.forEach(nodeDao::insert); + //查询dbNode的properties的id不为空 + clientNodes=getNodesByNodeSet(entity,runNodeSet); + List updateNodes=clientNodes.stream().filter(item->item.getId()!=null).collect(Collectors.toList()); + updateNodes.forEach(nodeDao::updateById); + //查询库里有,nodeset里边有没有,则是要进行删除 + for (DataProductionScheduleNodeEntity dbNodes : dbNode) { + if(clientNodes.stream().noneMatch(item->dbNodes.getNo().equals(item.getNo()))){ + nodeDao.deleteById(dbNodes.getId()); + } + } + } + } + + private List getNodesByNodeSet(DataProductionScheduleEntity entity, Set runNodeSet) { + List clientNodes=new ArrayList<>(10); + int i=0; + for (FlowNode flowNode : runNodeSet) { + i++; + DataProductionScheduleNodeEntity nodeentity = new DataProductionScheduleNodeEntity(); + nodeentity.setId(flowNode.getProperties().getId()); + nodeentity.setTaskScheduleId(entity.getProjectId()); + nodeentity.setProjectId(entity.getProjectId()); + nodeentity.setNo(flowNode.getId()); + nodeentity.setSort(i); + nodeentity.setName(flowNode.getProperties().getName()); + nodeentity.setType(flowNode.getType()); + nodeentity.setX(flowNode.getX()); + nodeentity.setY(flowNode.getY()); + nodeentity.setNote(flowNode.getProperties().getNote()); + nodeentity.setTaskId(flowNode.getProperties().getTaskId()); + nodeentity.setTaskType(flowNode.getProperties().getTaskType()); + nodeentity.setFailGoOn(flowNode.getProperties().getFailGoOn()); + nodeentity.setWeight(flowNode.getProperties().getWeight()); + clientNodes.add(nodeentity); + } + return clientNodes; + } + + private void buildRunNodes(Set runNodeSet, List starNodes, List nodes, List edges) { + if (starNodes.isEmpty()){ + return; + } + //按照权重逆序,权重越高越在前面 + starNodes.sort((item1, item2)->item2.getProperties().getWeight().compareTo(item1.getProperties().getWeight())); + for (FlowNode starNode : starNodes) { + if (nodes.contains(starNode)){ + runNodeSet.remove(starNode); + } + runNodeSet.add(starNode); + } + //获取子节点 + ArrayList childNodes = new ArrayList<>(2); + for (FlowNode starNode : starNodes) { + //获取以node为父节点 + List collect=edges.stream().filter(item->starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList()); + for (FlowEdge flowEdge : collect) { + FlowNode flowNode=nodes.stream().filter(item->item.getId().equals(flowEdge.getTargetNodeId())).findFirst().get(); + childNodes.add(flowNode); + } + } + buildRunNodes(runNodeSet,childNodes,nodes,edges); + } + + private void checkClosedLoop(List starNodes, List nodes, List edges) { + if (starNodes.isEmpty()){ + throw new ServerException("流程不允许存在闭环,请检查"); + } + for (FlowNode starNode : starNodes) { + Set nodeSet = new HashSet<>(); + //遍历检查闭环 + dfs(nodeSet,starNode,nodes,edges); + + } + } + + private void dfs(Set nodeSet, FlowNode starNode, List nodes, List edges) { + List collect=edges.stream().filter(item-> starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList()); + if (collect.isEmpty()){ + return; + } + for (FlowEdge edge : collect) { + FlowNode targetNode=nodes.stream().filter(item->item.getId().equals(edge.getTargetNodeId())).findFirst().get(); + if (nodeSet.contains(targetNode)){ + throw new ServerException("流程不允许存在闭环,请检查"); + } + nodeSet.add(targetNode); + dfs(nodeSet,targetNode,nodes,edges); + } + } + + + private List getStarNodes(List nodes, List edges) { + if (nodes.isEmpty()){ + throw new ServerException("流程不能为空"); + } + ArrayList startNode = new ArrayList<>(1); + for (FlowNode node : nodes) { + if (edges.stream().noneMatch(item->node.getId().equals(item.getTargetNodeId()))){ + startNode.add(node); + } + } + return startNode; + } + + private FlowNode getFlowNode(DataProductionScheduleNodeEntity dbNode) { + FlowNode flowNode = new FlowNode(); + flowNode.setId(dbNode.getNo()); + flowNode.setType(dbNode.getType()); + flowNode.setX(dbNode.getX()); + flowNode.setY(dbNode.getY()); + FlowNodeProperties properties = new FlowNodeProperties(); + properties.setId(dbNode.getId()); + properties.setName(dbNode.getName()); + properties.setTaskId(dbNode.getTaskId()); + properties.setWeight(dbNode.getWeight()); + properties.setTaskType(dbNode.getTaskType()); + properties.setTaskTypeVal(Dialect.getByCode(dbNode.getTaskType().toString()).getValue()); + properties.setNote(dbNode.getNote()); + properties.setFailGoOn(dbNode.getFailGoOn()); + flowNode.setProperties(properties); + return flowNode; + } + + private LambdaQueryWrapper getWrapper(DataProductionsScheduleQuery query) { + LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); + wrapper.like(StringUtil.isNotBlank(query.getName()),DataProductionScheduleEntity::getName,query.getName()) + .eq(query.getStatus()!=null,DataProductionScheduleEntity::getStatus,query.getStatus()) + .orderByDesc(DataProductionScheduleEntity::getUpdateTime).orderByDesc(DataProductionScheduleEntity::getId); + dataScopeWithOrgId(wrapper); + return wrapper; + } +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleVo.java b/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleVo.java new file mode 100644 index 0000000..a22539a --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleVo.java @@ -0,0 +1,69 @@ +package net.srt.disposition.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.srt.framework.common.utils.DateUtils; + +import java.io.Serializable; +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.vo + * @Author: jpz + * @CreateTime: 2023/12/28 8:57 + */ +@Data +@Schema(description = "数据生产-作业调度") +public class DataProductionScheduleVo implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "主键") + private Long id; + + private Long projectId; + @Schema(description = "调度名称") + private String name; + + @Schema(description = "是否周期执行") + private Integer isCycle; + + @Schema(description = "cron表达式") + private String cron; + + @Schema(description = "描述") + private String note; + + @Schema(description = "节点关系json") + private String edges; + + @Schema(description = "0-未发布 1-已发布") + private Integer status; + + private Date releaseTime; + private Integer releaseUserId; + + @Schema(description = "版本号") + private Integer version; + + @Schema(description = "删除标识 0:正常 1:已删除") + private Integer deleted; + + @Schema(description = "创建者") + private Long creator; + + @Schema(description = "创建时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date createTime; + + @Schema(description = "更新者") + private Long updater; + + @Schema(description = "更新时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date updateTime; + + + +} diff --git a/srt-data-development/src/main/resources/bootstrap.yml b/srt-data-development/src/main/resources/bootstrap.yml index d03d7f4..57dc047 100644 --- a/srt-data-development/src/main/resources/bootstrap.yml +++ b/srt-data-development/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ spring: servlet: load-on-startup: 1 application: - name: srt-data-development + name: srt-cloud-data-development profiles: active: dev cloud: @@ -14,9 +14,9 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: c5d32e76-b83c-4254-8176-1c6a2cee8e3b + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} - group: srt2.1 + group: srt2.0 config: server-addr: ${spring.cloud.nacos.discovery.server-addr} namespace: ${spring.cloud.nacos.discovery.namespace}