diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/controller/DataDatabaseController.java b/srt-cloud-data-integrate/src/main/java/net/srt/controller/DataDatabaseController.java index db2d0c2..d5c12e4 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/controller/DataDatabaseController.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/controller/DataDatabaseController.java @@ -111,7 +111,7 @@ public class DataDatabaseController { return Result.ok(schemaTableDataVo); } - @GetMapping("/list-all") + @GetMapping("/list-all/{t}") @Operation(summary = "获取当前用户所能看到的的数据表") public Result> listAll() { List list = dataDatabaseService.listAll(); diff --git a/srt-cloud-data-service/src/main/resources/mapper/ApiConfigDao.xml b/srt-cloud-data-service/src/main/resources/mapper/ApiConfigDao.xml index df9e5a8..7e54245 100644 --- a/srt-cloud-data-service/src/main/resources/mapper/ApiConfigDao.xml +++ b/srt-cloud-data-service/src/main/resources/mapper/ApiConfigDao.xml @@ -70,7 +70,7 @@ AND dsac.content_type = #{contentType} - + AND dsac.status = #{status} diff --git a/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml b/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml index 0154d40..2501a36 100644 --- a/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml +++ b/srt-cloud-framework/srt-cloud-flink/flink-core-all/pom.xml @@ -109,6 +109,9 @@ net.srt flink-function + + + diff --git a/srt-cloud-gateway/src/main/resources/bootstrap.yml b/srt-cloud-gateway/src/main/resources/bootstrap.yml index c7daaeb..b53cfdb 100644 --- a/srt-cloud-gateway/src/main/resources/bootstrap.yml +++ b/srt-cloud-gateway/src/main/resources/bootstrap.yml @@ -89,7 +89,7 @@ spring: discovery: server-addr: 101.34.77.101:8848 # 命名空间,默认:public - namespace: 9de208a6-cb30-41ae-a880-78196c99c050 + namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37 service: ${spring.application.name} group: srt2.0 diff --git a/srt-data-development/pom.xml b/srt-data-development/pom.xml index ddc5fb1..9aa3b83 100644 --- a/srt-data-development/pom.xml +++ b/srt-data-development/pom.xml @@ -79,6 +79,12 @@ io.minio minio + + + net.srt + flink-core-all + 2.0.0 + diff --git a/srt-data-development/src/main/java/net/srt/disposition/controller/DataCheckSqlController.java b/srt-data-development/src/main/java/net/srt/disposition/controller/DataCheckSqlController.java index 7025673..33ced52 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/controller/DataCheckSqlController.java +++ b/srt-data-development/src/main/java/net/srt/disposition/controller/DataCheckSqlController.java @@ -1,13 +1,17 @@ package net.srt.disposition.controller; +import io.swagger.v3.oas.annotations.Operation; import lombok.AllArgsConstructor; import net.srt.disposition.dto.DataCheckSqlDto; import net.srt.disposition.dto.DataSqlDto; +import net.srt.disposition.entity.DataCentre; import net.srt.disposition.entity.DataCheckSqlEntity; +import net.srt.disposition.entity.DevelopmentOperationalRecordsQuery; +import net.srt.disposition.service.DataCenterService; import net.srt.disposition.service.DataCheckSqlService; -import net.srt.disposition.vo.DataCheckSqlVo; -import net.srt.disposition.vo.DataSqlVo; +import net.srt.disposition.vo.*; import net.srt.flink.common.result.SqlExplainResult; +import net.srt.framework.common.page.PageResult; import net.srt.framework.common.utils.Result; import org.springframework.web.bind.annotation.*; @@ -21,19 +25,41 @@ public class DataCheckSqlController { private DataCheckSqlService dataCheckSqlService; + private DataCenterService dataCenterService; + @PostMapping public Result add(@RequestBody DataCheckSqlDto dataCheckSqlDto){ dataCheckSqlService.add(dataCheckSqlDto); return Result.ok(); } + @GetMapping("/history/page") + public Result> historyPage(@RequestBody DevelopmentOperationalRecordsQuery query){ + PageResult dataCentrePageResult= dataCenterService.dataCenterService(query); + return Result.ok(dataCentrePageResult); + } + + @DeleteMapping("/history") + @Operation(summary = "运维中心删除") + public Result> deleted(@RequestBody List ids){ + dataCenterService.deleted(ids); + return Result.ok(); + } + + @GetMapping("/env-list") + @Operation(summary = "运维中心删除") + public Result> envList(){ + List developmentTaskSaveVos = dataCenterService.listEnvList(); + return Result.ok(developmentTaskSaveVos); + } @GetMapping("/console-log") - public Result consoleLog(){ - return Result.ok(); + public Result consoleLog(){ + return Result.ok(dataCheckSqlService.getLog()); } @GetMapping("/clear-log") public Result checkLog(){ + dataCheckSqlService.clearLog(); return Result.ok(); } @@ -43,6 +69,14 @@ public class DataCheckSqlController { return Result.ok(dataSqlVo); } + @PostMapping("/execute-sql") + public Result executeSql(@RequestBody DataSqlDto dataSqlDto){ + DataCheckVo dataSqlVo=dataCheckSqlService.executeSql(dataSqlDto); + return Result.ok(dataSqlVo); + } + + + @GetMapping("/{id}") public Result get(@PathVariable Integer id) { DataCheckSqlEntity dataCheckSqlEntity = dataCheckSqlService.find(id); diff --git a/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java new file mode 100644 index 0000000..d5fe5e0 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java @@ -0,0 +1,79 @@ +package net.srt.disposition.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import net.srt.disposition.dto.Flow; +import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.service.DataProductionScheduleService; +import net.srt.disposition.vo.DataProductionScheduleVo; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.common.utils.Result; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.parameters.P; +import org.springframework.web.bind.annotation.*; + +import javax.validation.Valid; +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.controller + * @Author: jpz + * @CreateTime: 2023/12/28 8:49 + */ +@RestController +@RequestMapping("schedule") +@Tag(name = "数据生产-作业调度") +@AllArgsConstructor +public class DataProductionScheduleController { + private final DataProductionScheduleService dataProductionScheduleService; + + @GetMapping("page") + @Operation(summary = "分页") + @PreAuthorize("hasAuthority('data-development:schedule:page')") + public Result> page(@Valid DataProductionsScheduleQuery query){ + PageResult pageResult = dataProductionScheduleService.page(query); + return Result.ok(pageResult); + } + + @GetMapping("{id}") + @Operation(summary = "信息") + @PreAuthorize("hasAuthority('data-development:schedule:info')") + public Result get(@PathVariable("id") Long id){ + return Result.ok(dataProductionScheduleService.get(id)); + } + @PostMapping + @Operation(summary = "保存") + @PreAuthorize("hasAuthority('data-development:schedule:save')") + public Result save(@RequestBody Flow flow){ + dataProductionScheduleService.save(flow); + return Result.ok(); + } + + @PostMapping("/run/{id}") + @Operation(summary = "执行(返回log的id)") + @PreAuthorize("hasAuthority('data-development:schedule:run')") + public Result run(@PathVariable Integer id){ + return Result.ok(dataProductionScheduleService.run(id)); + } + + @DeleteMapping + @Operation(summary = "删除") + @PreAuthorize("hasAuthority('data-development:schedule:delete')") + public Result delete(@RequestBody List idList){ + dataProductionScheduleService.delete(idList); + return Result.ok(); + } + + @PostMapping("/release/{id}") + @Operation(summary = "发布") + @PreAuthorize("hasAuthority('data-development:schedule:release')") + public Result release(@PathVariable Integer id) { + dataProductionScheduleService.release(id); + return Result.ok(); + } + + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionTreeController.java b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionTreeController.java index b29b296..e8f226a 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionTreeController.java +++ b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionTreeController.java @@ -20,7 +20,7 @@ public class DataProductionTreeController { private DataProductionService dataProductionService; @GetMapping - public Result> listResult(@RequestParam String t){ + public Result> listResult(){ List dispositionVos=dataProductionService.dataTreeList(); return Result.ok(dispositionVos); } diff --git a/srt-data-development/src/main/java/net/srt/disposition/convert/DataCentreConvert.java b/srt-data-development/src/main/java/net/srt/disposition/convert/DataCentreConvert.java new file mode 100644 index 0000000..d54cebe --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/convert/DataCentreConvert.java @@ -0,0 +1,18 @@ +package net.srt.disposition.convert; + +import net.srt.disposition.entity.DataCentre; +import net.srt.disposition.vo.DevelopmentTaskSaveVo; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +@Mapper +public interface DataCentreConvert { + + DataCentreConvert INSTANCE = Mappers.getMapper(DataCentreConvert.class); + + List convertList(List records); + + List convert(List entities); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionConvert.java b/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionConvert.java new file mode 100644 index 0000000..9239746 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionConvert.java @@ -0,0 +1,22 @@ +package net.srt.disposition.convert; + +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.disposition.vo.DataProductionScheduleVo; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.convert + * @Author: jpz + * @CreateTime: 2023/12/28 9:56 + */ +@Mapper +public interface DataProductionConvert { + + DataProductionConvert INSTANCE = Mappers.getMapper(DataProductionConvert.class); + + List convertList(List list); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/Flow.java b/srt-data-development/src/main/java/net/srt/disposition/dto/Flow.java new file mode 100644 index 0000000..6bea4c7 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/Flow.java @@ -0,0 +1,31 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:04 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Flow { + private Long id; + private Integer recordId; + private Integer ifCycle; + private String name; + private String cron; + private String note; + private Integer status; + private Date releaseTime; + private Integer releaseUserId; + private List nodes; + private List edges; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/FlowEdge.java b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowEdge.java new file mode 100644 index 0000000..2ddb1b9 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowEdge.java @@ -0,0 +1,28 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:04 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowEdge { + private String id; + private String type; + private Map startPoint; + private Map endPoint; + private List> pointsList; + private String sourceNodeId; + private String targetNodeId; + private Map properties; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNode.java b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNode.java new file mode 100644 index 0000000..eb9a407 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNode.java @@ -0,0 +1,22 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:04 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowNode { + private String id; + private String type; + private Integer x; + private Integer y; + private FlowNodeProperties properties; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNodeProperties.java b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNodeProperties.java new file mode 100644 index 0000000..8571057 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/FlowNodeProperties.java @@ -0,0 +1,37 @@ +package net.srt.disposition.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/28 14:05 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowNodeProperties { + public static final Map SUCCESS_STYLE = new HashMap() { + { + put("border","3px solid #06c733"); + } + }; + private Long id; + private Integer nodeRecordId; + private String name; + private Integer taskId; + private Integer weight; + private Integer taskType; + private String taskTypeVal; + private String note; + private Integer failGoOn; + private Map style; + //CommonRunStatus + private Integer runStatus = 1; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataCentre.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataCentre.java index 6218be7..0742b52 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/entity/DataCentre.java +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataCentre.java @@ -12,7 +12,7 @@ import java.util.Date; public class DataCentre { private Long id; private Long projectId; - private String sqlDbType; + private Integer sqlDbType; private Long databaseId; private Long clusterId; private Long clusterConfigurationId; @@ -53,6 +53,6 @@ public class DataCentre { private String executeSql; private String executeNo; private String jib; - private String duration; + private Integer duration; } diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataDatabaseDevEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataDatabaseDevEntity.java index 7596cbf..c528f23 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/entity/DataDatabaseDevEntity.java +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataDatabaseDevEntity.java @@ -23,8 +23,10 @@ public class DataDatabaseDevEntity { private String databasePort; private Integer databaseType; private Integer deleted; - private Integer id; + private Long id; private String isJdbc; + private String isRtApprove; + private String jdbcUrl; private String name; private String noReReason; private String password; diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleEntity.java new file mode 100644 index 0000000..427edbd --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleEntity.java @@ -0,0 +1,38 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 9:35 + */ +@EqualsAndHashCode(callSuper = false) +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +@TableName(value = "data_production_schedule",autoResultMap = true) +public class DataProductionScheduleEntity extends BaseEntity { + private Long projectId; + private String name; + private Integer ifCycle; + private String cron; + private String note; + private Integer status; + private Date releaseTime; + private Integer releaseStatus; + private String edges; + private Integer releaseUserId; + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeEntity.java new file mode 100644 index 0000000..8208015 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeEntity.java @@ -0,0 +1,76 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import net.srt.framework.mybatis.entity.BaseEntity; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 15:01 + */ +@Data +@TableName("data_production_schedule_node") +public class DataProductionScheduleNodeEntity extends BaseEntity { + private Long projectId; + /** + * 关联的调度id + */ + private Long taskScheduleId; + + /** + * 节点编号 + */ + private String no; + + /** + * 执行顺序 + */ + private Integer sort; + + /** + * 节点名称 + */ + private String name; + + /** + * 节点类型 + */ + private String type; + + /** + * 横坐标 + */ + private Integer x; + + /** + * 纵坐标 + */ + private Integer y; + + /** + * 节点描述 + */ + private String note; + + /** + * 关联的作业id + */ + private Integer taskId; + + /** + * 作业类型 + */ + private Integer taskType; + + /** + * 遇错是否继续 + */ + private Integer failGoOn; + + /** + * 权重 + */ + private Integer weight; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeRecordEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeRecordEntity.java new file mode 100644 index 0000000..9efd6d6 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleNodeRecordEntity.java @@ -0,0 +1,71 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 19:06 + */ +@Data +@EqualsAndHashCode(callSuper = false) +@TableName("data_production_schedule_node_record") +public class DataProductionScheduleNodeRecordEntity extends BaseEntity { + public static final String SCHEDULE_NODE_RECORD = "SCHEDULE_NODE_RECORD"; + /** + * 调度id + */ + private Integer taskScheduleId; + + /** + * 调度节点id + */ + private Integer scheduleNodeId; + + /** + * 调度记录id + */ + private Integer scheduleRecordId; + + private String scheduleNodeNo; + + /** + * 作业id + */ + private Integer taskId; + + /** + * 项目(租户)id + */ + private Long projectId; + + /** + * 当前状态 字典 run_status + */ + private Integer runStatus; + + /** + * 开始时间 + */ + private Date startTime; + + /** + * 结束时间 + */ + private Date endTime; + + /** + * 运行日志 + */ + private String log; + + private Integer executeType; + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleRecordEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleRecordEntity.java new file mode 100644 index 0000000..69beca6 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionScheduleRecordEntity.java @@ -0,0 +1,58 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 17:26 + */ +@EqualsAndHashCode(callSuper = false) +@Data +@TableName("data_production_schedule_record") +public class DataProductionScheduleRecordEntity extends BaseEntity { + public static final String SCHEDULE_RECORD = "SCHEDULE_RECORD"; + //程序异常中断重新执行的情况 + public static final String SCHEDULE_RESTART_RECORD = "SCHEDULE_RESTART_RECORD"; + private String name; + /** + * 调度id + */ + private Integer taskScheduleId; + + /** + * 项目(租户)id + */ + private Long projectId; + + /** + * 当前状态 字典 run_status + */ + private Integer runStatus; + + /** + * 开始时间 + */ + private Date startTime; + + /** + * 结束时间 + */ + private Date endTime; + + /** + * 运行日志 + */ + private String log; + + private Integer executeType; + + private String configJson; + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionTaskEntity.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionTaskEntity.java new file mode 100644 index 0000000..7f4c583 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DataProductionTaskEntity.java @@ -0,0 +1,227 @@ +package net.srt.disposition.entity; + +import com.baomidou.mybatisplus.annotation.FieldStrategy; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.flink.common.assertion.Asserts; +import net.srt.flink.core.job.JobConfig; +import net.srt.flink.gateway.GatewayType; +import net.srt.framework.mybatis.entity.BaseEntity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.entity + * @Author: jpz + * @CreateTime: 2023/12/28 19:36 + */ +@EqualsAndHashCode(callSuper = false) +@Data +@TableName(value = "data_production_task", autoResultMap = true) +public class DataProductionTaskEntity extends BaseEntity { + /** + * 节点id + */ + private Long catalogueId; + + /** + * 任务名称 + */ + private String name; + + /** + * 项目(租户id) + */ + private Long projectId; + + /** + * 任务别名 + */ + private String alias; + + /** + * 任务类型 + */ + private Integer dialect; + + /** + * 任务运行类型 + */ + private Integer type; + + /** + * CheckPoint trigger seconds + */ + private Integer checkPoint; + + /** + * SavePoint strategy + */ + private Integer savePointStrategy; + + /** + * SavePointPath + */ + private String savePointPath; + + /** + * 并行度 + */ + private Integer parallelism; + + /** + * 全局变量 + */ + private Boolean fragment; + + /** + * insert 语句集 + */ + private Boolean statementSet; + + /** + * 批处理模式 + */ + private Boolean batchModel; + + /** + * flink集群实例id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long clusterId; + + /** + * 集群配置id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long clusterConfigurationId; + + /** + * 数据类型(1-数据库 2-中台库)(sql模式下) + */ + private Integer sqlDbType; + + /** + * 数据库id(sql模式下) + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long databaseId; + + private Integer openTrans; + + /** + * Jar ID + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long jarId; + + /** + * env id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long envId; + + /** + * alert group id + */ + private Long alertGroupId; + + /** + * configuration json + */ + private String configJson; + + /** + * Job Note + */ + private String note; + + /** + * Job lifecycle + */ + private Integer step; + + /** + * job instance id + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long jobInstanceId; + + /** + * 自动停止 + */ + private Boolean useAutoCancel; + + /** + * 打印流 + */ + private Boolean useChangeLog; + + /** + * 预览结果 + */ + private Boolean useResult; + + /** + * 预览行数 + */ + private Integer pvdataNum; + + /** + * is enable + */ + private Boolean enabled; + + /** + * version id + */ + private Integer versionId; + + @TableField(exist = false) + private String statement; + + @TableField(exist = false) + private String clusterName; + + @TableField(exist = false) + private List> config = new ArrayList<>(); + + + public List> parseConfig() { + ObjectMapper objectMapper = new ObjectMapper(); + try { + if (Asserts.isNotNullString(configJson)) { + config = objectMapper.readValue(configJson, ArrayList.class); + } + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + return config; + } + + public JobConfig buildSubmitConfig() { + boolean useRemote = true; + if (clusterId == null || clusterId == 0) { + useRemote = false; + } + Map map = new HashMap<>(); + for (Map item : config) { + if (Asserts.isNotNull(item)) { + map.put(item.get("key"), item.get("value")); + } + } + int jid = Asserts.isNull(jarId) ? 0 : jarId.intValue(); + boolean fg = Asserts.isNull(fragment) ? false : fragment; + boolean sts = Asserts.isNull(statementSet) ? false : statementSet; + return new JobConfig(GatewayType.getByCode(type.toString()).getLongValue(), step, false, false, useRemote, clusterId == null ? null : clusterId.intValue(), clusterConfigurationId == null ? null : clusterConfigurationId.intValue(), jid, getId().intValue(), + alias, fg, sts, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map); + } +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/entity/DevelopmentOperationalRecordsQuery.java b/srt-data-development/src/main/java/net/srt/disposition/entity/DevelopmentOperationalRecordsQuery.java new file mode 100644 index 0000000..17d93e4 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/entity/DevelopmentOperationalRecordsQuery.java @@ -0,0 +1,71 @@ +package net.srt.disposition.entity; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.common.query.Query; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +@Data +@EqualsAndHashCode(callSuper = false) +@Schema(description = "数据开发-运维中心查询") +public class DevelopmentOperationalRecordsQuery extends Query { + + + @Schema(description = "调度节点记录唯一标识符") + private Long nodeRecordId; + + @Schema(description = "记录标识符") + private Long recordId; + + @Schema(description = "任务唯一标识符") + private Long taskId; + + @Schema(description = "作业名称") + private String jobName; + + @Schema(description = "执行状态") + private Integer status; + + @Schema(description = "实例状态") + private String instanceStatus; + + @Schema(description = "方言") + private Integer dialect; + + @Schema(description = "类型") + private String type; + + @Schema(description = "SQL数据库类型") + private String sqlDbType; + + @Schema(description = "数据库唯一标识符") + private Long databaseId; + + @Schema(description = "集群唯一标识符") + private Integer clusterId; + + @Schema(description = "集群配置唯一标识符") + private Long clusterConfigurationId; + + @Schema(description = "开始时间") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date startTime; + + @Schema(description = "结束时间") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date endTime; + + @Schema(description = "执行完成时间戳") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date finishTime; + + +} + diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataCentreMapper.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataCentreMapper.java new file mode 100644 index 0000000..bfe1352 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataCentreMapper.java @@ -0,0 +1,7 @@ +package net.srt.disposition.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import net.srt.disposition.entity.DataCentre; + +public interface DataCentreMapper extends BaseMapper { +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleDao.java new file mode 100644 index 0000000..b5d31d1 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleDao.java @@ -0,0 +1,17 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.mapper + * @Author: jpz + * @CreateTime: 2023/12/28 9:45 + */ +@Mapper +public interface DataProductionScheduleDao extends BaseDao { + + void changeStutus(DataProductionScheduleEntity dbEntity); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeDao.java new file mode 100644 index 0000000..5916bdb --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeDao.java @@ -0,0 +1,15 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleNodeEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dao + * @Author: jpz + * @CreateTime: 2023/12/28 15:00 + */ +@Mapper +public interface DataProductionScheduleNodeDao extends BaseDao { +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeRecordDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeRecordDao.java new file mode 100644 index 0000000..a468772 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleNodeRecordDao.java @@ -0,0 +1,15 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dao + * @Author: jpz + * @CreateTime: 2023/12/28 19:08 + */ +@Mapper +public interface DataProductionScheduleNodeRecordDao extends BaseDao { +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleRecordDao.java b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleRecordDao.java new file mode 100644 index 0000000..6e083e8 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/mapper/DataProductionScheduleRecordDao.java @@ -0,0 +1,15 @@ +package net.srt.disposition.mapper; + +import net.srt.disposition.entity.DataProductionScheduleRecordEntity; +import net.srt.framework.mybatis.dao.BaseDao; +import org.apache.ibatis.annotations.Mapper; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dao + * @Author: jpz + * @CreateTime: 2023/12/28 18:54 + */ +@Mapper +public interface DataProductionScheduleRecordDao extends BaseDao { +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionsScheduleQuery.java b/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionsScheduleQuery.java new file mode 100644 index 0000000..bb618d6 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionsScheduleQuery.java @@ -0,0 +1,20 @@ +package net.srt.disposition.query; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.common.query.Query; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.query + * @Author: jpz + * @CreateTime: 2023/12/28 9:39 + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Schema(description = "数据生成-任务调度查询") +public class DataProductionsScheduleQuery extends Query { + private String name; + private Integer status; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataCenterService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataCenterService.java new file mode 100644 index 0000000..6983cf7 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataCenterService.java @@ -0,0 +1,19 @@ +package net.srt.disposition.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import net.srt.disposition.entity.DataCentre; +import net.srt.disposition.entity.DevelopmentOperationalRecordsQuery; +import net.srt.disposition.vo.DevelopmentTaskSaveVo; +import net.srt.framework.common.page.PageResult; + +import java.util.List; + +public interface DataCenterService extends IService { + PageResult dataCenterService(DevelopmentOperationalRecordsQuery query); + + void deleted(List ids); + + List listEnvList(); + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataCheckSqlService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataCheckSqlService.java index 6cdef3d..bda4f2e 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/service/DataCheckSqlService.java +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataCheckSqlService.java @@ -3,10 +3,14 @@ package net.srt.disposition.service; import com.baomidou.mybatisplus.extension.service.IService; import net.srt.disposition.dto.DataCheckSqlDto; import net.srt.disposition.dto.DataSqlDto; +import net.srt.disposition.entity.DataCentre; import net.srt.disposition.entity.DataCheckSqlEntity; import net.srt.disposition.vo.DataCheckSqlVo; +import net.srt.disposition.vo.DataCheckVo; import net.srt.disposition.vo.DataSqlVo; +import net.srt.disposition.vo.LogVo; import net.srt.flink.common.result.SqlExplainResult; +import net.srt.framework.common.page.PageResult; import java.util.List; @@ -16,4 +20,13 @@ public interface DataCheckSqlService extends IService { void add(DataCheckSqlDto dataCheckSqlDto); List explainSql(DataSqlDto dataSqlDto); + + DataCheckVo executeSql(DataSqlDto dataSqlDto); + + LogVo getLog(); + + void clearLog(); + + + } diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java new file mode 100644 index 0000000..8f5771d --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java @@ -0,0 +1,32 @@ +package net.srt.disposition.service; + +import net.srt.disposition.dto.Flow; +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.vo.DataProductionScheduleVo; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.mybatis.service.BaseService; + +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service + * @Author: jpz + * @CreateTime: 2023/12/28 8:56 + */ + +public interface DataProductionScheduleService extends BaseService { + PageResult page(DataProductionsScheduleQuery query); + + Flow get(Long id); + + void save(Flow flow); + + String run(Integer id); + + + void delete(List idList); + + void release(Integer id); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionTaskService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionTaskService.java new file mode 100644 index 0000000..2bc200b --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionTaskService.java @@ -0,0 +1,17 @@ +package net.srt.disposition.service; + +import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity; +import net.srt.disposition.entity.DataProductionTaskEntity; +import net.srt.flink.core.job.JobResult; +import net.srt.framework.mybatis.service.BaseService; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service + * @Author: jpz + * @CreateTime: 2023/12/28 19:36 + */ +public interface DataProductionTaskService extends BaseService { + JobResult scheduleTask(DataProductionScheduleNodeRecordEntity nodeRecordEntity); + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataCenterServiceImpl.java b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataCenterServiceImpl.java new file mode 100644 index 0000000..225db47 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataCenterServiceImpl.java @@ -0,0 +1,68 @@ +package net.srt.disposition.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.StringUtils; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.AllArgsConstructor; +import net.srt.disposition.convert.DataCentreConvert; +import net.srt.disposition.entity.DataCentre; +import net.srt.disposition.entity.DevelopmentOperationalRecordsQuery; +import net.srt.disposition.mapper.DataCentreMapper; +import net.srt.disposition.service.DataCenterService; +import net.srt.disposition.vo.DevelopmentTaskSaveVo; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.mybatis.service.impl.BaseServiceImpl; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +@AllArgsConstructor +public class DataCenterServiceImpl extends BaseServiceImpl implements DataCenterService { + + @Override + public PageResult dataCenterService(DevelopmentOperationalRecordsQuery query) { + IPage page=baseMapper.selectPage(getPage(query),getWrapper(query)); + + return new PageResult<>(DataCentreConvert.INSTANCE.convertList(page.getRecords()),page.getTotal()); + } + + @Override + public void deleted(List ids) { + removeByIds(ids); + for (Long id : ids) { + LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); + wrapper.eq(DataCentre::getId,id); + baseMapper.delete(wrapper); + } + + } + + @Override + public List listEnvList() { + LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); + List entities = baseMapper.selectList(wrapper); + List developmentTaskSaveVos = DataCentreConvert.INSTANCE.convert(entities); + return developmentTaskSaveVos; + } + + private LambdaQueryWrapper getWrapper(DevelopmentOperationalRecordsQuery query) { + LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); + wrapper.eq(query.getNodeRecordId()!=null,DataCentre::getScheduleNodeRecordId,query.getNodeRecordId()); + wrapper.eq(query.getTaskId()!=null,DataCentre::getTaskId,query.getTaskId()); + wrapper.like(StringUtils.isNotBlank(query.getJobName()),DataCentre::getJobName,query.getJobName()); + wrapper.eq(query.getStatus()!=null,DataCentre::getStatus,query.getStatus()); + wrapper.eq(query.getInstanceStatus()!=null,DataCentre::getInstanceStatus,query.getInstanceStatus()); + wrapper.eq(query.getDialect()!=null,DataCentre::getDialect,query.getDialect()); + wrapper.eq(StringUtils.isNotBlank(query.getSqlDbType()),DataCentre::getSqlDbType,query.getSqlDbType()); + wrapper.eq(query.getDatabaseId()!=null,DataCentre::getDatabaseId,query.getDatabaseId()); + wrapper.eq(query.getClusterId()!=null,DataCentre::getClusterId,query.getClusterId()); + wrapper.eq(query.getClusterConfigurationId()!=null,DataCentre::getClusterConfigurationId,query.getClusterConfigurationId()); + wrapper.gt(query.getStartTime()!=null,DataCentre::getStartTime,query.getStartTime()); + wrapper.lt(query.getEndTime()!=null,DataCentre::getEndTime,query.getEndTime()); + wrapper.eq(query.getFinishTime()!=null,DataCentre::getFinishTime,query.getFinishTime()); + + return wrapper; + } +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataCheckSqlServiceImpl.java b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataCheckSqlServiceImpl.java index 1cba03b..5af9fb8 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataCheckSqlServiceImpl.java +++ b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataCheckSqlServiceImpl.java @@ -6,11 +6,17 @@ import lombok.AllArgsConstructor; import net.srt.disposition.convert.DataCheckSqlConvert; import net.srt.disposition.dto.DataCheckSqlDto; import net.srt.disposition.dto.DataSqlDto; +import net.srt.disposition.entity.DataCentre; import net.srt.disposition.entity.DataCheckSqlEntity; +import net.srt.disposition.entity.DataDatabaseDevEntity; import net.srt.disposition.entity.DataProductionTreeEntity; +import net.srt.disposition.mapper.DataCentreMapper; import net.srt.disposition.mapper.DataCheckSqlMapper; import net.srt.disposition.mapper.DataProductionMapper; import net.srt.disposition.service.DataCheckSqlService; +import net.srt.disposition.vo.DataCheckVo; +import net.srt.disposition.vo.LogVo; +import net.srt.disposition.vo.Result; import net.srt.flink.common.result.SqlExplainResult; import net.srt.flink.common.utils.LogUtil; import net.srt.flink.process.context.ProcessContextHolder; @@ -18,20 +24,29 @@ import net.srt.flink.process.model.ProcessEntity; import net.srt.flink.process.model.ProcessStatus; import net.srt.flink.process.model.ProcessStep; import net.srt.flink.process.model.ProcessType; +import net.srt.flink.process.pool.ConsolePool; import net.srt.framework.mybatis.service.impl.BaseServiceImpl; +import net.srt.framework.security.cache.TokenStoreCache; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum; +import javax.servlet.http.HttpServletRequest; +import java.sql.*; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; @Service @AllArgsConstructor public class DataCheckSqlServiceImpl extends BaseServiceImpl implements DataCheckSqlService { private DataProductionMapper dataProductionMapper; - + private TokenStoreCache storeCache; + private DataCentreMapper dataCentreMapper; + private HttpServletRequest request; @Override public DataCheckSqlEntity find(Integer id) { DataCheckSqlEntity dataCheckSqlEntity = baseMapper.selectById(id); @@ -91,6 +106,251 @@ public class DataCheckSqlServiceImpl extends BaseServiceImpl results = result.getResults(); + String Result= ""; + for (Result resultsDTO : results) { + Result=resultsDTO.getRowData().toString(); + } + operationalRecords.setResult(Result); + operationalRecords.setEndTime(new Date()); + operationalRecords.setStartTime(new Date()); + operationalRecords.setTaskId(dto.getId()); + operationalRecords.setCreateTime(new Date()); + operationalRecords.setExecuteNo(UUID.randomUUID().toString()); + operationalRecords.setDuration(0); + return operationalRecords; + } + + public String getAccessToken1() { + String accessToken = request.getHeader("Authorization"); + if (StringUtils.isBlank(accessToken)) { + accessToken = request.getParameter("access_token"); + } + return accessToken; + } + + public DataCheckVo selectColumns(DataSqlDto dto) throws Exception{ + + //获取数据库信息 + List databaseList = dto.getDatabaseList(); + DataDatabaseDevEntity databaseVO = new DataDatabaseDevEntity(); + for (DataDatabaseDevEntity dataDatabaseVO : databaseList) { + if(dto.getDatabaseId().equals(dataDatabaseVO.getId())){ + databaseVO.setId(dataDatabaseVO.getId()); + databaseVO.setDatabaseName(dataDatabaseVO.getDatabaseName()); + databaseVO.setDatabaseIp(dataDatabaseVO.getDatabaseIp()); + databaseVO.setDatabasePort(dataDatabaseVO.getDatabasePort()); + databaseVO.setUserName(dataDatabaseVO.getUserName()); + databaseVO.setPassword(dataDatabaseVO.getPassword()); + databaseVO.setDatabaseType(dataDatabaseVO.getDatabaseType()); + databaseVO.setJdbcUrl(dataDatabaseVO.getJdbcUrl()); + databaseVO.setVersion(dataDatabaseVO.getVersion()); + databaseVO.setCreateTime(dataDatabaseVO.getCreateTime()); + databaseVO.setUpdateTime(dataDatabaseVO.getUpdateTime()); + databaseVO.setDeleted(dataDatabaseVO.getDeleted()); + databaseVO.setCreator(dataDatabaseVO.getCreator()); + databaseVO.setUpdater(dataDatabaseVO.getUpdater()); + } + } + //日志对象构建 + ProcessEntity process= BuildStes(dto); + + DataCheckVo exeCuteSql = new DataCheckVo(); + List sqlExplainResults = new ArrayList<>(); + String current = null; + try { + // 记录SQL验证开始的日志 + process.info("Start execute sql..."); + + if(databaseVO.getDatabaseType().equals(ProductTypeEnum.MYSQL.getIndex())){ + String className = ProductTypeEnum.MYSQL.getDriveClassName(); + + //1. 注册驱动 + try { + Class.forName(className); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + // 获取数据库连接 + try (Connection conn = DriverManager.getConnection(databaseVO.getJdbcUrl(), databaseVO.getUserName(), databaseVO.getPassword())) { + // 获取表名 + String dtoStatement = dto.getStatement(); + String tableName = getTableName(dtoStatement); + + // 获取数据库操作对象 + try (Statement statement = conn.createStatement()) { + String sql = "SHOW COLUMNS FROM " + tableName; + + // 获取列名并存储在一个列表中 + List columnNames = new ArrayList<>(); + try (ResultSet selectColumnNames = statement.executeQuery(sql)) { + while (selectColumnNames.next()) { + String columnName = selectColumnNames.getString("Field"); + columnNames.add(columnName); + System.out.println("列名: " + columnName); + } + } + // 获取表中数据 + try (ResultSet row = statement.executeQuery(dtoStatement)) { + // 创建集合用于存储查询结果 + List> resultList = new ArrayList<>(); + + // 处理查询结果 + while (row.next()) { + // 创建一个 Map 用于存储每一行的数据 + Map rowData = new HashMap<>(); + + // 遍历字段名列表,获取每个字段对应的值,并存储到 Map 中 + for (String columnName : columnNames) { + Object value = row.getObject(columnName); + rowData.put(columnName, value); + } + // 将 Map 添加到集合中 + resultList.add(rowData); + } + Result resultsDTO=BuildResultsDTO(columnNames,resultList,dto); + exeCuteSql.setResult(resultsDTO); + exeCuteSql.setSuccess(true); + } + + // 使用SQLUtils将输入的SQL解析为SQL语句列表 + List stmtList = SQLUtils.parseStatements(dto.getStatement(), "SqlServer"); + + // 遍历列表中的每个SQL语句 + for (SQLStatement item : stmtList) { + // 设置当前正在处理的SQL语句,以便记录日志 + current = item.toString(); + + // 获取SQL语句的类型(例如,SELECT、INSERT)并添加到结果列表中 + String type = item.getClass().getSimpleName(); + sqlExplainResults.add(SqlExplainResult.success(type, current, null)); + } + process.info("Execute sql succeed."); + } + } catch (SQLException e) { + // 如果在SQL解析过程中发生异常,将失败的结果添加到列表中 + sqlExplainResults.add(SqlExplainResult.fail(current, LogUtil.getError(e))); + String error = LogUtil.getError(e); + // 记录错误消息 + process.error(error); + } + }else{ + throw new Exception("目前只支持Mysql类型"); + } + process.infoEnd(); + + } catch (Exception e) { + // 如果在SQL解析过程中发生异常,将失败的结果添加到列表中 + sqlExplainResults.add(SqlExplainResult.fail(current, LogUtil.getError(e))); + String error = LogUtil.getError(e); + // 记录错误消息 + process.error(error); + } + + return exeCuteSql; + } + + private Result BuildResultsDTO(List columnNames, List> resultList, DataSqlDto dto) { + + + long seconds = new Date().getSeconds(); + Result resultsDTO = new Result(); + String dtoStatement = dto.getStatement(); + List result = new ArrayList<>(); + Result resultsDTO1 = new Result(); + resultsDTO1.setColumns(columnNames); + + resultsDTO1.setRowData(resultList); + resultsDTO1.setCount(resultList.size()); + resultsDTO1.setSql(dtoStatement); + resultsDTO1.setSuccess(true); + resultsDTO1.setJobId(dto.getId()); + long take = new Date().getSeconds() - seconds; + resultsDTO1.setTime(take); + resultsDTO1.setIfQuery(true); + result.add(resultsDTO1); + + + resultsDTO.setResults(result); + resultsDTO.setCount(resultList.size()); + resultsDTO.setSql(dtoStatement); + resultsDTO.setSuccess(true); + resultsDTO.setJobId(dto.getId()); + long take1 = new Date().getSeconds() - seconds; + resultsDTO.setTime(take1); + return resultsDTO; + } + + private String getTableName(String dtoStatement) { + + // 使用正则表达式匹配表名 + Pattern pattern = Pattern.compile("from\\s+([a-zA-Z_][a-zA-Z0-9_]*)", Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(dtoStatement); + String tableName =""; + // 查找匹配 + if (matcher.find()) { + // 获取匹配的表名 + tableName = matcher.group(1); + System.out.println("Table Name: " + tableName); + } else { + System.out.println("Table name not found."); + } + return tableName; + + } + + + @Override + public LogVo getLog() { + LogVo clearLog = new LogVo(); + ConsolePool instance = ConsolePool.getInstance(); + + if(instance.exist(getAccessToken())){ + clearLog.setLog(instance.get(getAccessToken()).toString()); + clearLog.setEnd(true); + return clearLog; + }else{ + return clearLog; + } + } + + @Override + public void clearLog() { + ConsolePool instance = ConsolePool.getInstance(); + instance.remove(getAccessToken()); + } + private ProcessEntity BuildStes(DataSqlDto dto) { // 从上下文获取当前进程实体 ProcessEntity process = ProcessContextHolder.getProcess(); diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java new file mode 100644 index 0000000..8233a1a --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java @@ -0,0 +1,365 @@ +package net.srt.disposition.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.AllArgsConstructor; +import net.srt.api.module.data.development.constant.ExecuteType; +import net.srt.api.module.data.integrate.constant.CommonRunStatus; +import net.srt.api.module.quartz.QuartzDataProductionScheduleApi; +import net.srt.disposition.convert.DataProductionConvert; +import net.srt.disposition.mapper.DataProductionScheduleNodeDao; +import net.srt.disposition.mapper.DataProductionScheduleNodeRecordDao; +import net.srt.disposition.mapper.DataProductionScheduleRecordDao; +import net.srt.disposition.dto.Flow; +import net.srt.disposition.dto.FlowEdge; +import net.srt.disposition.dto.FlowNode; +import net.srt.disposition.dto.FlowNodeProperties; +import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.disposition.entity.DataProductionScheduleNodeEntity; +import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity; +import net.srt.disposition.entity.DataProductionScheduleRecordEntity; +import net.srt.disposition.mapper.DataProductionScheduleDao; +import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.service.DataProductionScheduleService; +import net.srt.disposition.vo.DataProductionScheduleVo; +import net.srt.flink.common.config.Dialect; +import net.srt.flink.common.utils.JSONUtil; +import net.srt.flink.common.utils.ThreadUtil; +import net.srt.flink.core.job.JobResult; +import net.srt.flink.process.context.ProcessContextHolder; +import net.srt.flink.process.model.ProcessEntity; +import net.srt.flink.process.model.ProcessType; +import net.srt.flink.process.pool.ConsolePool; +import net.srt.framework.common.exception.ServerException; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.mybatis.service.impl.BaseServiceImpl; +import net.srt.framework.security.user.SecurityUser; +import org.apache.logging.log4j.core.util.CronExpression; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import srt.cloud.framework.dbswitch.common.util.StringUtil; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service.impl + * @Author: jpz + * @CreateTime: 2023/12/28 9:42 + */ +@Service +@AllArgsConstructor +public class DataProductionScheduleServiceimpl extends BaseServiceImpl implements DataProductionScheduleService { + private final DataProductionScheduleNodeDao nodeDao; + private final DataProductionScheduleRecordDao recordDao; + private final DataProductionScheduleNodeRecordDao nodeRecordDao; + private final QuartzDataProductionScheduleApi scheduleApi; + @Override + public PageResult page(DataProductionsScheduleQuery query) { + IPage page=baseMapper.selectPage(getPage(query),getWrapper(query)); + return new PageResult<>(DataProductionConvert.INSTANCE.convertList(page.getRecords()),page.getTotal()); + } + + @Override + public Flow get(Long id) { + DataProductionScheduleEntity entity = baseMapper.selectById(id); + Flow flow = new Flow(); + flow.setId(entity.getId()); + flow.setName(entity.getName()); + flow.setCron(entity.getCron()); + flow.setStatus(entity.getStatus()); + flow.setReleaseUserId(entity.getReleaseStatus()); + flow.setReleaseTime(entity.getReleaseTime()); + flow.setIfCycle(entity.getIfCycle()); + flow.setNote(entity.getNote()); + flow.setEdges(JSONUtil.parseObject(entity.getEdges(),new TypeReference>(){ + })); + ArrayList nodes = new ArrayList<>(6); + flow.setNodes(nodes); + //获取结点 + HashMap queryMap = new HashMap<>(); + queryMap.put("task_schedule_id",id); + List dbNodes=nodeDao.selectByMap(queryMap); + for (DataProductionScheduleNodeEntity dbNode : dbNodes) { + FlowNode flowNode=getFlowNode(dbNode); + nodes.add(flowNode); + } + return flow; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void save(Flow flow) { + insertOrUpdate(flow); + } + + @Override + public String run(Integer id) { + return scheduleRun(id, ExecuteType.HAND); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void delete(List idList) { + removeByIds(idList); + for (Long id : idList) { + //同步删除节点 + Map delMap = new HashMap<>(); + delMap.put("task_schedule_id", id); + nodeDao.deleteByMap(delMap); + } + } + + @Override + public void release(Integer id) { + scheduleApi.release(id.longValue()); + //更新状态,发布人和发布时间 + DataProductionScheduleEntity dbEntity = new DataProductionScheduleEntity(); + dbEntity.setId(id.longValue()); + dbEntity.setStatus(1); + dbEntity.setReleaseUserId(SecurityUser.getUserId().intValue()); + dbEntity.setReleaseTime(new Date()); + baseMapper.changeStutus(dbEntity); + } + + + private String scheduleRun(Integer id, ExecuteType executeType) { + DataProductionScheduleEntity scheduleEntity=baseMapper.selectById(id); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(DataProductionScheduleNodeEntity::getTaskScheduleId,id) + .orderByAsc(DataProductionScheduleNodeEntity::getSort); + List dbNodes=nodeDao.selectList(wrapper); + //新增调度日志 + DataProductionScheduleRecordEntity scheduleRecordEntity = new DataProductionScheduleRecordEntity(); + scheduleRecordEntity.setProjectId(scheduleRecordEntity.getProjectId()); + scheduleRecordEntity.setName(scheduleEntity.getName()); + scheduleRecordEntity.setTaskScheduleId(id); + scheduleRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode()); + scheduleRecordEntity.setStartTime(new Date()); + scheduleRecordEntity.setExecuteType(executeType.getValue()); + recordDao.insert(scheduleRecordEntity); + ThreadUtil.threadPool.execute(()->runNode(scheduleEntity,executeType,dbNodes,scheduleRecordEntity)); + return scheduleEntity.getId().toString(); + } + + private void runNode(DataProductionScheduleEntity scheduleEntity, ExecuteType executeType, List dbNodes, DataProductionScheduleRecordEntity scheduleRecordEntity) { + String processId=scheduleRecordEntity.getId()+DataProductionScheduleRecordEntity.SCHEDULE_RECORD; + ProcessEntity flowProcess= ProcessContextHolder.registerFlowProcess(ProcessEntity.init(ProcessType.FLINKEXECUTE,processId)); + flowProcess.info("Start run flow..."); + boolean recordSuccess = true; + List flowNodes = dbNodes.stream().map(this::getFlowNode).collect(Collectors.toList()); + for (DataProductionScheduleNodeEntity dbNode : dbNodes) { + //返回给前台的节点 + FlowNode flowNode = flowNodes.stream().filter(item -> item.getId().equals(dbNode.getNo())).findFirst().get(); + flowNode.getProperties().setRunStatus(CommonRunStatus.SUCCESS.getCode()); + flowNode.getProperties().setStyle(FlowNodeProperties.SUCCESS_STYLE); + flowProcess.info(String.format("Start run node %s-%s", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue())); + DataProductionScheduleNodeRecordEntity nodeRecordEntity = new DataProductionScheduleNodeRecordEntity(); + nodeRecordEntity.setProjectId(scheduleRecordEntity.getProjectId()); + nodeRecordEntity.setTaskScheduleId(scheduleEntity.getId().intValue()); + nodeRecordEntity.setScheduleNodeId(dbNode.getId().intValue()); + nodeRecordEntity.setScheduleNodeNo(dbNode.getNo()); + nodeRecordEntity.setScheduleRecordId(scheduleRecordEntity.getId().intValue()); + nodeRecordEntity.setTaskId(dbNode.getTaskId()); + nodeRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode()); + nodeRecordEntity.setStartTime(new Date()); + nodeRecordEntity.setExecuteType(executeType.getValue()); + nodeRecordDao.insert(nodeRecordEntity); + flowNode.getProperties().setNodeRecordId(nodeRecordEntity.getId().intValue()); + JobResult jobResult = null; + if (jobResult != null) { + flowProcess.info(jobResult.getLog()); + } + flowProcess.info(String.format("Node %s-%s run end", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue())); + } + //更新调度记录 + scheduleRecordEntity.setEndTime(new Date()); + scheduleRecordEntity.setRunStatus(recordSuccess ? CommonRunStatus.SUCCESS.getCode() : CommonRunStatus.FAILED.getCode()); + scheduleRecordEntity.setLog(ConsolePool.getInstance().get(processId).toString()); + //保存历史json + Flow flow = new Flow(); + flow.setId(scheduleEntity.getId()); + flow.setIfCycle(scheduleEntity.getIfCycle()); + flow.setRecordId(scheduleRecordEntity.getId().intValue()); + flow.setName(scheduleEntity.getName()); + flow.setNote(scheduleEntity.getNote()); + flow.setCron(scheduleEntity.getCron()); + flow.setNodes(flowNodes); + flow.setEdges(JSONUtil.parseObject(scheduleEntity.getEdges(), new TypeReference>() { + })); + scheduleRecordEntity.setConfigJson(JSONUtil.toJsonString(flow)); + recordDao.updateById(scheduleRecordEntity); + flowProcess.infoEnd(); + //移除日志 + ProcessContextHolder.clearFlow(); + ConsolePool.getInstance().remove(processId); + } + + private void insertOrUpdate(Flow flow) { + if (flow.getIfCycle()==1&&!CronExpression.isValidExpression(flow.getCron())){ + throw new ServerException("cron表达式有误,请检查后重新填写"); + } + DataProductionScheduleEntity entity=DataProductionScheduleEntity.builder().id(flow.getId()) + .projectId(getProjectId()).ifCycle(flow.getIfCycle()) + .name(flow.getName()).cron(flow.getCron()).note(flow.getNote()).status(flow.getStatus()).releaseTime(flow.getReleaseTime()).releaseUserId(flow.getReleaseUserId()).build(); + List nodes=flow.getNodes(); + List edges=flow.getEdges(); + //寻找入度为0节点 + List starNodes=getStarNodes(nodes,edges); + //检查闭环 + checkClosedLoop(starNodes,nodes,edges); + Set runNodeSet=new LinkedHashSet<>(); + buildRunNodes(runNodeSet,starNodes,nodes,edges); + if (entity.getId()==null){ + entity.setEdges(JSONUtil.toJsonString(edges)); + baseMapper.insert(entity); + //转换前端传过来节点为entity + List clientNodes=getNodesByNodeSet(entity,runNodeSet); + //新增节点 + clientNodes.forEach(nodeDao::insert); + }else { + List clientNodes=getNodesByNodeSet(entity,runNodeSet); + entity.setEdges(JSONUtil.toJsonString(edges)); + baseMapper.updateById(entity); + //获取库中节点 + HashMap queryMap = new HashMap<>(); + queryMap.put("task-schedule_id",entity.getId()); + List dbNode=nodeDao.selectByMap(queryMap); + //查询clientNodes的properties的id + List insertNodes=clientNodes.stream().filter(item->item.getId()==null).collect(Collectors.toList()); + insertNodes.forEach(nodeDao::insert); + //查询dbNode的properties的id不为空 + clientNodes=getNodesByNodeSet(entity,runNodeSet); + List updateNodes=clientNodes.stream().filter(item->item.getId()!=null).collect(Collectors.toList()); + updateNodes.forEach(nodeDao::updateById); + //查询库里有,nodeset里边有没有,则是要进行删除 + for (DataProductionScheduleNodeEntity dbNodes : dbNode) { + if(clientNodes.stream().noneMatch(item->dbNodes.getNo().equals(item.getNo()))){ + nodeDao.deleteById(dbNodes.getId()); + } + } + } + } + + private List getNodesByNodeSet(DataProductionScheduleEntity entity, Set runNodeSet) { + List clientNodes=new ArrayList<>(10); + int i=0; + for (FlowNode flowNode : runNodeSet) { + i++; + DataProductionScheduleNodeEntity nodeentity = new DataProductionScheduleNodeEntity(); + nodeentity.setId(flowNode.getProperties().getId()); + nodeentity.setTaskScheduleId(entity.getProjectId()); + nodeentity.setProjectId(entity.getProjectId()); + nodeentity.setNo(flowNode.getId()); + nodeentity.setSort(i); + nodeentity.setName(flowNode.getProperties().getName()); + nodeentity.setType(flowNode.getType()); + nodeentity.setX(flowNode.getX()); + nodeentity.setY(flowNode.getY()); + nodeentity.setNote(flowNode.getProperties().getNote()); + nodeentity.setTaskId(flowNode.getProperties().getTaskId()); + nodeentity.setTaskType(flowNode.getProperties().getTaskType()); + nodeentity.setFailGoOn(flowNode.getProperties().getFailGoOn()); + nodeentity.setWeight(flowNode.getProperties().getWeight()); + clientNodes.add(nodeentity); + } + return clientNodes; + } + + private void buildRunNodes(Set runNodeSet, List starNodes, List nodes, List edges) { + if (starNodes.isEmpty()){ + return; + } + //按照权重逆序,权重越高越在前面 + starNodes.sort((item1, item2)->item2.getProperties().getWeight().compareTo(item1.getProperties().getWeight())); + for (FlowNode starNode : starNodes) { + if (nodes.contains(starNode)){ + runNodeSet.remove(starNode); + } + runNodeSet.add(starNode); + } + //获取子节点 + ArrayList childNodes = new ArrayList<>(2); + for (FlowNode starNode : starNodes) { + //获取以node为父节点 + List collect=edges.stream().filter(item->starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList()); + for (FlowEdge flowEdge : collect) { + FlowNode flowNode=nodes.stream().filter(item->item.getId().equals(flowEdge.getTargetNodeId())).findFirst().get(); + childNodes.add(flowNode); + } + } + buildRunNodes(runNodeSet,childNodes,nodes,edges); + } + + private void checkClosedLoop(List starNodes, List nodes, List edges) { + if (starNodes.isEmpty()){ + throw new ServerException("流程不允许存在闭环,请检查"); + } + for (FlowNode starNode : starNodes) { + Set nodeSet = new HashSet<>(); + //遍历检查闭环 + dfs(nodeSet,starNode,nodes,edges); + + } + } + + private void dfs(Set nodeSet, FlowNode starNode, List nodes, List edges) { + List collect=edges.stream().filter(item-> starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList()); + if (collect.isEmpty()){ + return; + } + for (FlowEdge edge : collect) { + FlowNode targetNode=nodes.stream().filter(item->item.getId().equals(edge.getTargetNodeId())).findFirst().get(); + if (nodeSet.contains(targetNode)){ + throw new ServerException("流程不允许存在闭环,请检查"); + } + nodeSet.add(targetNode); + dfs(nodeSet,targetNode,nodes,edges); + } + } + + + private List getStarNodes(List nodes, List edges) { + if (nodes.isEmpty()){ + throw new ServerException("流程不能为空"); + } + ArrayList startNode = new ArrayList<>(1); + for (FlowNode node : nodes) { + if (edges.stream().noneMatch(item->node.getId().equals(item.getTargetNodeId()))){ + startNode.add(node); + } + } + return startNode; + } + + private FlowNode getFlowNode(DataProductionScheduleNodeEntity dbNode) { + FlowNode flowNode = new FlowNode(); + flowNode.setId(dbNode.getNo()); + flowNode.setType(dbNode.getType()); + flowNode.setX(dbNode.getX()); + flowNode.setY(dbNode.getY()); + FlowNodeProperties properties = new FlowNodeProperties(); + properties.setId(dbNode.getId()); + properties.setName(dbNode.getName()); + properties.setTaskId(dbNode.getTaskId()); + properties.setWeight(dbNode.getWeight()); + properties.setTaskType(dbNode.getTaskType()); + properties.setTaskTypeVal(Dialect.getByCode(dbNode.getTaskType().toString()).getValue()); + properties.setNote(dbNode.getNote()); + properties.setFailGoOn(dbNode.getFailGoOn()); + flowNode.setProperties(properties); + return flowNode; + } + + private LambdaQueryWrapper getWrapper(DataProductionsScheduleQuery query) { + LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); + wrapper.like(StringUtil.isNotBlank(query.getName()),DataProductionScheduleEntity::getName,query.getName()) + .eq(query.getStatus()!=null,DataProductionScheduleEntity::getStatus,query.getStatus()) + .orderByDesc(DataProductionScheduleEntity::getUpdateTime).orderByDesc(DataProductionScheduleEntity::getId); + dataScopeWithOrgId(wrapper); + return wrapper; + } +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/DataCheckVo.java b/srt-data-development/src/main/java/net/srt/disposition/vo/DataCheckVo.java new file mode 100644 index 0000000..847071c --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/DataCheckVo.java @@ -0,0 +1,28 @@ +package net.srt.disposition.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +@Data +public class DataCheckVo { + private Integer id; + private String jobConfig; + private String jobManagerAddress; + private Integer status; + private boolean success; + private String statement; + private Integer jobId; + private Integer jobInstanceId; + private String error; + private Result result; + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date startTime; + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date endTime; + private String log; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleVo.java b/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleVo.java new file mode 100644 index 0000000..a22539a --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleVo.java @@ -0,0 +1,69 @@ +package net.srt.disposition.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.srt.framework.common.utils.DateUtils; + +import java.io.Serializable; +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.vo + * @Author: jpz + * @CreateTime: 2023/12/28 8:57 + */ +@Data +@Schema(description = "数据生产-作业调度") +public class DataProductionScheduleVo implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "主键") + private Long id; + + private Long projectId; + @Schema(description = "调度名称") + private String name; + + @Schema(description = "是否周期执行") + private Integer isCycle; + + @Schema(description = "cron表达式") + private String cron; + + @Schema(description = "描述") + private String note; + + @Schema(description = "节点关系json") + private String edges; + + @Schema(description = "0-未发布 1-已发布") + private Integer status; + + private Date releaseTime; + private Integer releaseUserId; + + @Schema(description = "版本号") + private Integer version; + + @Schema(description = "删除标识 0:正常 1:已删除") + private Integer deleted; + + @Schema(description = "创建者") + private Long creator; + + @Schema(description = "创建时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date createTime; + + @Schema(description = "更新者") + private Long updater; + + @Schema(description = "更新时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date updateTime; + + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/DevelopmentOperationalRecordsVo.java b/srt-data-development/src/main/java/net/srt/disposition/vo/DevelopmentOperationalRecordsVo.java new file mode 100644 index 0000000..a1f0e1f --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/DevelopmentOperationalRecordsVo.java @@ -0,0 +1,141 @@ +package net.srt.disposition.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +/** + * @author : WangZhanpeng + * @date : 2023/12/27 19:14 + */ +@Data +@Schema(description = "数据开发-运维中心") +public class DevelopmentOperationalRecordsVo { + + @Schema(description = "主键") + private Long id; + + @Schema(description = "所属项目唯一标识符") + private Long projectId; + + @Schema(description = "SQL数据库类型") + private String sqlDbType; + + @Schema(description = "数据库唯一标识符") + private Long databaseId; + + @Schema(description = "集群唯一标识符") + private Integer clusterId; + + @Schema(description = "集群配置唯一标识符") + private Long clusterConfigurationId; + + @Schema(description = "会话信息") + private String session; + + @Schema(description = "作业唯一标识符") + private Long jobId; + + @Schema(description = "作业名称") + private String jobName; + + @Schema(description = "作业管理器地址") + private String jobManagerAddress; + + @Schema(description = "执行状态") + private Integer status; + + @Schema(description = "方言") + private Integer dialect; + + @Schema(description = "类型") + private String type; + + @Schema(description = "SQL语句") + private String statement; + + @Schema(description = "错误信息") + private String error; + + @Schema(description = "执行结果") + private String result; + + @Schema(description = "配置JSON字符串") + private String configJson; + + @Schema(description = "开始时间") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date startTime; + + @Schema(description = "结束时间") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date endTime; + + @Schema(description = "任务唯一标识符") + private Long taskId; + + @Schema(description = "执行类型") + private String executeType; + + @Schema(description = "调度唯一标识符") + private Long scheduleId; + + @Schema(description = "调度节点唯一标识符") + private Long scheduleNodeId; + + @Schema(description = "调度记录唯一标识符") + private Long scheduleRecordId; + + @Schema(description = "调度节点记录唯一标识符") + private Long scheduleNodeRecordId; + + @Schema(description = "实例状态") + private String instanceStatus; + + @Schema(description = "执行完成时间戳") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date finishTime; + + @Schema(description = "执行的SQL语句") + private String executeSql; + + @Schema(description = "执行编号") + private String executeNo; + + @Schema(description = "作业标识符") + private String jid; + + @Schema(description = "执行持续时间") + private Integer duration; + + @Schema(description = "实体的版本") + private Integer version; + + @Schema(description = "表示实体是否已删除的标志") + private Integer deleted; + + @Schema(description = "创建者的ID") + private Integer creator; + + @Schema(description = "实体的创建时间") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + + @Schema(description = "更新者的ID") + private Integer updater; + + @Schema(description = "实体的更新时间") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date updateTime; + + + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/DevelopmentTaskSaveVo.java b/srt-data-development/src/main/java/net/srt/disposition/vo/DevelopmentTaskSaveVo.java new file mode 100644 index 0000000..d97f21e --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/DevelopmentTaskSaveVo.java @@ -0,0 +1,143 @@ +package net.srt.disposition.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.srt.framework.common.utils.DateUtils; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author : WangZhanpeng + * @date : 2023/12/24 19:59 + */ +@Data +@Schema(description = "数据开发-sql作业") +public class DevelopmentTaskSaveVo implements Serializable { + + @Schema(description = "唯一标识") + private Long id; + + @Schema(description = "警报组标识") + private Long alertGroupId; + + @Schema(description = "别名") + private String alias; + + @Schema(description = "批处理模型") + private String batchModel; + + @Schema(description = "目录标识") + private Long catalogueId; + + @Schema(description = "检查点") + private String checkPoint; + + @Schema(description = "集群配置标识") + private Long clusterConfigurationId; + + @Schema(description = "集群标识") + private Long clusterId; + + @Schema(description = "配置 JSON") + private String configJson; + + @Schema(description = "数据库标识") + private Long databaseId; + + @Schema(description = "方言") + private String dialect; + + @Schema(description = "是否启用") + private String enabled; + + @Schema(description = "环境标识") + private String envId; + + @Schema(description = "片段") + private String fragment; + + @Schema(description = "JAR标识") + private Long jarId; + + @Schema(description = "作业实例标识") + private Long jobInstanceId; + + @Schema(description = "名称") + private String name; + + @Schema(description = "备注") + private String note; + + @Schema(description = "开放传输") + private String openTrans; + + @Schema(description = "并行度") + private Integer parallelism; + + @Schema(description = "处理结束时间") + private String processEnd; + + @Schema(description = "项目标识") + private Long projectId; + + @Schema(description = "PV数据数量") + private Integer pvdataNum; + + @Schema(description = "保存点路径") + private String savePointPath; + + @Schema(description = "保存点策略") + private String savePointStrategy; + + @Schema(description = "SQL数据库类型") + private String sqlDbType; + + @Schema(description = "语句") + private String statement; + + @Schema(description = "语句集") + private String statementSet; + + @Schema(description = "步骤") + private String step; + + @Schema(description = "类型") + private String type; + + @Schema(description = "是否使用自动取消") + private String useAutoCancel; + + @Schema(description = "是否使用变更日志") + private String useChangeLog; + + @Schema(description = "是否使用结果") + private String useResult; + + @Schema(description = "版本标识") + private Long versionId; + + @Schema(description = "版本") + private Integer version; + + @Schema(description = "已删除") + private String deleted; + + @Schema(description = "创建者") + private Integer creator; + + @Schema(description = "创建时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date createTime; + + @Schema(description = "更新者") + private Integer updater; + + @Schema(description = "更新时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date updateTime; + +} + + diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/LogVo.java b/srt-data-development/src/main/java/net/srt/disposition/vo/LogVo.java new file mode 100644 index 0000000..46cdfa5 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/LogVo.java @@ -0,0 +1,9 @@ +package net.srt.disposition.vo; + +import lombok.Data; + +@Data +public class LogVo { + private String log; + private boolean end; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/Result.java b/srt-data-development/src/main/java/net/srt/disposition/vo/Result.java new file mode 100644 index 0000000..493a5c0 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/Result.java @@ -0,0 +1,23 @@ +package net.srt.disposition.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +@Data +public class Result { + private List results; + private boolean ifQuery; + private String sql; + private Long time; + private boolean success; + private String errorMsg; + private Integer count; + private List columns; + private List> rowData; + private Long jobId; +}