Merge remote-tracking branch 'origin/dev' into dev

dev
zmyYYDS 2023-12-28 22:34:25 +08:00
commit d11d2b3b72
41 changed files with 2114 additions and 15 deletions

View File

@ -111,7 +111,7 @@ public class DataDatabaseController {
return Result.ok(schemaTableDataVo);
}
@GetMapping("/list-all")
@GetMapping("/list-all/{t}")
@Operation(summary = "获取当前用户所能看到的的数据表")
public Result<List<DataDatabaseVO>> listAll() {
List<DataDatabaseVO> list = dataDatabaseService.listAll();

View File

@ -70,7 +70,7 @@
<if test="contentType != null and contentType.trim() != ''">
AND dsac.content_type = #{contentType}
</if>
<if test="status != ">
<if test="status != null">
AND dsac.status = #{status}
</if>
<if test="sqlDbType != null">

View File

@ -109,6 +109,9 @@
<groupId>net.srt</groupId>
<artifactId>flink-function</artifactId>
</dependency>
</dependencies>
<profiles>

View File

@ -89,7 +89,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 9de208a6-cb30-41ae-a880-78196c99c050
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0

View File

@ -79,6 +79,12 @@
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
</dependency>
<dependency>
<groupId>net.srt</groupId>
<artifactId>flink-core-all</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>

View File

@ -1,13 +1,17 @@
package net.srt.disposition.controller;
import io.swagger.v3.oas.annotations.Operation;
import lombok.AllArgsConstructor;
import net.srt.disposition.dto.DataCheckSqlDto;
import net.srt.disposition.dto.DataSqlDto;
import net.srt.disposition.entity.DataCentre;
import net.srt.disposition.entity.DataCheckSqlEntity;
import net.srt.disposition.entity.DevelopmentOperationalRecordsQuery;
import net.srt.disposition.service.DataCenterService;
import net.srt.disposition.service.DataCheckSqlService;
import net.srt.disposition.vo.DataCheckSqlVo;
import net.srt.disposition.vo.DataSqlVo;
import net.srt.disposition.vo.*;
import net.srt.flink.common.result.SqlExplainResult;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.common.utils.Result;
import org.springframework.web.bind.annotation.*;
@ -21,19 +25,41 @@ public class DataCheckSqlController {
private DataCheckSqlService dataCheckSqlService;
private DataCenterService dataCenterService;
@PostMapping
public Result add(@RequestBody DataCheckSqlDto dataCheckSqlDto){
dataCheckSqlService.add(dataCheckSqlDto);
return Result.ok();
}
@GetMapping("/history/page")
public Result<PageResult<DataCentre>> historyPage(@RequestBody DevelopmentOperationalRecordsQuery query){
PageResult<DataCentre> dataCentrePageResult= dataCenterService.dataCenterService(query);
return Result.ok(dataCentrePageResult);
}
@DeleteMapping("/history")
@Operation(summary = "运维中心删除")
public Result<PageResult<DevelopmentOperationalRecordsVo>> deleted(@RequestBody List<Long> ids){
dataCenterService.deleted(ids);
return Result.ok();
}
@GetMapping("/env-list")
@Operation(summary = "运维中心删除")
public Result<List<DevelopmentTaskSaveVo>> envList(){
List<DevelopmentTaskSaveVo> developmentTaskSaveVos = dataCenterService.listEnvList();
return Result.ok(developmentTaskSaveVos);
}
@GetMapping("/console-log")
public Result consoleLog(){
return Result.ok();
public Result<LogVo> consoleLog(){
return Result.ok(dataCheckSqlService.getLog());
}
@GetMapping("/clear-log")
public Result checkLog(){
dataCheckSqlService.clearLog();
return Result.ok();
}
@ -43,6 +69,14 @@ public class DataCheckSqlController {
return Result.ok(dataSqlVo);
}
@PostMapping("/execute-sql")
public Result<DataCheckVo> executeSql(@RequestBody DataSqlDto dataSqlDto){
DataCheckVo dataSqlVo=dataCheckSqlService.executeSql(dataSqlDto);
return Result.ok(dataSqlVo);
}
@GetMapping("/{id}")
public Result<DataCheckSqlEntity> get(@PathVariable Integer id) {
DataCheckSqlEntity dataCheckSqlEntity = dataCheckSqlService.find(id);

View File

@ -0,0 +1,79 @@
package net.srt.disposition.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.service.DataProductionScheduleService;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.common.utils.Result;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.parameters.P;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.controller
* @Author: jpz
* @CreateTime: 2023/12/28 8:49
*/
@RestController
@RequestMapping("schedule")
@Tag(name = "数据生产-作业调度")
@AllArgsConstructor
public class DataProductionScheduleController {
private final DataProductionScheduleService dataProductionScheduleService;
@GetMapping("page")
@Operation(summary = "分页")
@PreAuthorize("hasAuthority('data-development:schedule:page')")
public Result<PageResult<DataProductionScheduleVo>> page(@Valid DataProductionsScheduleQuery query){
PageResult<DataProductionScheduleVo> pageResult = dataProductionScheduleService.page(query);
return Result.ok(pageResult);
}
@GetMapping("{id}")
@Operation(summary = "信息")
@PreAuthorize("hasAuthority('data-development:schedule:info')")
public Result<Flow> get(@PathVariable("id") Long id){
return Result.ok(dataProductionScheduleService.get(id));
}
@PostMapping
@Operation(summary = "保存")
@PreAuthorize("hasAuthority('data-development:schedule:save')")
public Result<String> save(@RequestBody Flow flow){
dataProductionScheduleService.save(flow);
return Result.ok();
}
@PostMapping("/run/{id}")
@Operation(summary = "执行(返回log的id)")
@PreAuthorize("hasAuthority('data-development:schedule:run')")
public Result<String> run(@PathVariable Integer id){
return Result.ok(dataProductionScheduleService.run(id));
}
@DeleteMapping
@Operation(summary = "删除")
@PreAuthorize("hasAuthority('data-development:schedule:delete')")
public Result<String> delete(@RequestBody List<Long> idList){
dataProductionScheduleService.delete(idList);
return Result.ok();
}
@PostMapping("/release/{id}")
@Operation(summary = "发布")
@PreAuthorize("hasAuthority('data-development:schedule:release')")
public Result<String> release(@PathVariable Integer id) {
dataProductionScheduleService.release(id);
return Result.ok();
}
}

View File

@ -20,7 +20,7 @@ public class DataProductionTreeController {
private DataProductionService dataProductionService;
@GetMapping
public Result<List<TreeNodeVo>> listResult(@RequestParam String t){
public Result<List<TreeNodeVo>> listResult(){
List<TreeNodeVo> dispositionVos=dataProductionService.dataTreeList();
return Result.ok(dispositionVos);
}

View File

@ -0,0 +1,18 @@
package net.srt.disposition.convert;
import net.srt.disposition.entity.DataCentre;
import net.srt.disposition.vo.DevelopmentTaskSaveVo;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
@Mapper
public interface DataCentreConvert {
DataCentreConvert INSTANCE = Mappers.getMapper(DataCentreConvert.class);
List<DataCentre> convertList(List<DataCentre> records);
List<DevelopmentTaskSaveVo> convert(List<DataCentre> entities);
}

View File

@ -0,0 +1,22 @@
package net.srt.disposition.convert;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.disposition.vo.DataProductionScheduleVo;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.convert
* @Author: jpz
* @CreateTime: 2023/12/28 9:56
*/
@Mapper
public interface DataProductionConvert {
DataProductionConvert INSTANCE = Mappers.getMapper(DataProductionConvert.class);
List<DataProductionScheduleVo> convertList(List<DataProductionScheduleEntity> list);
}

View File

@ -0,0 +1,31 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Flow {
private Long id;
private Integer recordId;
private Integer ifCycle;
private String name;
private String cron;
private String note;
private Integer status;
private Date releaseTime;
private Integer releaseUserId;
private List<FlowNode> nodes;
private List<FlowEdge> edges;
}

View File

@ -0,0 +1,28 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowEdge {
private String id;
private String type;
private Map<String, Object> startPoint;
private Map<String, Object> endPoint;
private List<Map<String, Object>> pointsList;
private String sourceNodeId;
private String targetNodeId;
private Map<String, Object> properties;
}

View File

@ -0,0 +1,22 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowNode {
private String id;
private String type;
private Integer x;
private Integer y;
private FlowNodeProperties properties;
}

View File

@ -0,0 +1,37 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:05
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowNodeProperties {
public static final Map<String, Object> SUCCESS_STYLE = new HashMap<String, Object>() {
{
put("border","3px solid #06c733");
}
};
private Long id;
private Integer nodeRecordId;
private String name;
private Integer taskId;
private Integer weight;
private Integer taskType;
private String taskTypeVal;
private String note;
private Integer failGoOn;
private Map<String, Object> style;
//CommonRunStatus
private Integer runStatus = 1;
}

View File

@ -12,7 +12,7 @@ import java.util.Date;
public class DataCentre {
private Long id;
private Long projectId;
private String sqlDbType;
private Integer sqlDbType;
private Long databaseId;
private Long clusterId;
private Long clusterConfigurationId;
@ -53,6 +53,6 @@ public class DataCentre {
private String executeSql;
private String executeNo;
private String jib;
private String duration;
private Integer duration;
}

View File

@ -23,8 +23,10 @@ public class DataDatabaseDevEntity {
private String databasePort;
private Integer databaseType;
private Integer deleted;
private Integer id;
private Long id;
private String isJdbc;
private String isRtApprove;
private String jdbcUrl;
private String name;
private String noReReason;
private String password;

View File

@ -0,0 +1,38 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 9:35
*/
@EqualsAndHashCode(callSuper = false)
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "data_production_schedule",autoResultMap = true)
public class DataProductionScheduleEntity extends BaseEntity {
private Long projectId;
private String name;
private Integer ifCycle;
private String cron;
private String note;
private Integer status;
private Date releaseTime;
private Integer releaseStatus;
private String edges;
private Integer releaseUserId;
}

View File

@ -0,0 +1,76 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import net.srt.framework.mybatis.entity.BaseEntity;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 15:01
*/
@Data
@TableName("data_production_schedule_node")
public class DataProductionScheduleNodeEntity extends BaseEntity {
private Long projectId;
/**
* id
*/
private Long taskScheduleId;
/**
*
*/
private String no;
/**
*
*/
private Integer sort;
/**
*
*/
private String name;
/**
*
*/
private String type;
/**
*
*/
private Integer x;
/**
*
*/
private Integer y;
/**
*
*/
private String note;
/**
* id
*/
private Integer taskId;
/**
*
*/
private Integer taskType;
/**
*
*/
private Integer failGoOn;
/**
*
*/
private Integer weight;
}

View File

@ -0,0 +1,71 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 19:06
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("data_production_schedule_node_record")
public class DataProductionScheduleNodeRecordEntity extends BaseEntity {
public static final String SCHEDULE_NODE_RECORD = "SCHEDULE_NODE_RECORD";
/**
* id
*/
private Integer taskScheduleId;
/**
* id
*/
private Integer scheduleNodeId;
/**
* id
*/
private Integer scheduleRecordId;
private String scheduleNodeNo;
/**
* id
*/
private Integer taskId;
/**
* id
*/
private Long projectId;
/**
* run_status
*/
private Integer runStatus;
/**
*
*/
private Date startTime;
/**
*
*/
private Date endTime;
/**
*
*/
private String log;
private Integer executeType;
}

View File

@ -0,0 +1,58 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 17:26
*/
@EqualsAndHashCode(callSuper = false)
@Data
@TableName("data_production_schedule_record")
public class DataProductionScheduleRecordEntity extends BaseEntity {
public static final String SCHEDULE_RECORD = "SCHEDULE_RECORD";
//程序异常中断重新执行的情况
public static final String SCHEDULE_RESTART_RECORD = "SCHEDULE_RESTART_RECORD";
private String name;
/**
* id
*/
private Integer taskScheduleId;
/**
* id
*/
private Long projectId;
/**
* run_status
*/
private Integer runStatus;
/**
*
*/
private Date startTime;
/**
*
*/
private Date endTime;
/**
*
*/
private String log;
private Integer executeType;
private String configJson;
}

View File

@ -0,0 +1,227 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.flink.common.assertion.Asserts;
import net.srt.flink.core.job.JobConfig;
import net.srt.flink.gateway.GatewayType;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 19:36
*/
@EqualsAndHashCode(callSuper = false)
@Data
@TableName(value = "data_production_task", autoResultMap = true)
public class DataProductionTaskEntity extends BaseEntity {
/**
* id
*/
private Long catalogueId;
/**
*
*/
private String name;
/**
* id
*/
private Long projectId;
/**
*
*/
private String alias;
/**
*
*/
private Integer dialect;
/**
*
*/
private Integer type;
/**
* CheckPoint trigger seconds
*/
private Integer checkPoint;
/**
* SavePoint strategy
*/
private Integer savePointStrategy;
/**
* SavePointPath
*/
private String savePointPath;
/**
*
*/
private Integer parallelism;
/**
*
*/
private Boolean fragment;
/**
* insert
*/
private Boolean statementSet;
/**
*
*/
private Boolean batchModel;
/**
* flinkid
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long clusterId;
/**
* id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long clusterConfigurationId;
/**
* 1- 2-sql
*/
private Integer sqlDbType;
/**
* idsql
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long databaseId;
private Integer openTrans;
/**
* Jar ID
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long jarId;
/**
* env id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long envId;
/**
* alert group id
*/
private Long alertGroupId;
/**
* configuration json
*/
private String configJson;
/**
* Job Note
*/
private String note;
/**
* Job lifecycle
*/
private Integer step;
/**
* job instance id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long jobInstanceId;
/**
*
*/
private Boolean useAutoCancel;
/**
*
*/
private Boolean useChangeLog;
/**
*
*/
private Boolean useResult;
/**
*
*/
private Integer pvdataNum;
/**
* is enable
*/
private Boolean enabled;
/**
* version id
*/
private Integer versionId;
@TableField(exist = false)
private String statement;
@TableField(exist = false)
private String clusterName;
@TableField(exist = false)
private List<Map<String, String>> config = new ArrayList<>();
public List<Map<String, String>> parseConfig() {
ObjectMapper objectMapper = new ObjectMapper();
try {
if (Asserts.isNotNullString(configJson)) {
config = objectMapper.readValue(configJson, ArrayList.class);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return config;
}
public JobConfig buildSubmitConfig() {
boolean useRemote = true;
if (clusterId == null || clusterId == 0) {
useRemote = false;
}
Map<String, String> map = new HashMap<>();
for (Map<String, String> item : config) {
if (Asserts.isNotNull(item)) {
map.put(item.get("key"), item.get("value"));
}
}
int jid = Asserts.isNull(jarId) ? 0 : jarId.intValue();
boolean fg = Asserts.isNull(fragment) ? false : fragment;
boolean sts = Asserts.isNull(statementSet) ? false : statementSet;
return new JobConfig(GatewayType.getByCode(type.toString()).getLongValue(), step, false, false, useRemote, clusterId == null ? null : clusterId.intValue(), clusterConfigurationId == null ? null : clusterConfigurationId.intValue(), jid, getId().intValue(),
alias, fg, sts, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map);
}
}

View File

@ -0,0 +1,71 @@
package net.srt.disposition.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.common.query.Query;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@Data
@EqualsAndHashCode(callSuper = false)
@Schema(description = "数据开发-运维中心查询")
public class DevelopmentOperationalRecordsQuery extends Query {
@Schema(description = "调度节点记录唯一标识符")
private Long nodeRecordId;
@Schema(description = "记录标识符")
private Long recordId;
@Schema(description = "任务唯一标识符")
private Long taskId;
@Schema(description = "作业名称")
private String jobName;
@Schema(description = "执行状态")
private Integer status;
@Schema(description = "实例状态")
private String instanceStatus;
@Schema(description = "方言")
private Integer dialect;
@Schema(description = "类型")
private String type;
@Schema(description = "SQL数据库类型")
private String sqlDbType;
@Schema(description = "数据库唯一标识符")
private Long databaseId;
@Schema(description = "集群唯一标识符")
private Integer clusterId;
@Schema(description = "集群配置唯一标识符")
private Long clusterConfigurationId;
@Schema(description = "开始时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
@Schema(description = "结束时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
@Schema(description = "执行完成时间戳")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date finishTime;
}

View File

@ -0,0 +1,7 @@
package net.srt.disposition.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.srt.disposition.entity.DataCentre;
public interface DataCentreMapper extends BaseMapper<DataCentre> {
}

View File

@ -0,0 +1,17 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.mapper
* @Author: jpz
* @CreateTime: 2023/12/28 9:45
*/
@Mapper
public interface DataProductionScheduleDao extends BaseDao<DataProductionScheduleEntity> {
void changeStutus(DataProductionScheduleEntity dbEntity);
}

View File

@ -0,0 +1,15 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleNodeEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dao
* @Author: jpz
* @CreateTime: 2023/12/28 15:00
*/
@Mapper
public interface DataProductionScheduleNodeDao extends BaseDao<DataProductionScheduleNodeEntity> {
}

View File

@ -0,0 +1,15 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dao
* @Author: jpz
* @CreateTime: 2023/12/28 19:08
*/
@Mapper
public interface DataProductionScheduleNodeRecordDao extends BaseDao<DataProductionScheduleNodeRecordEntity> {
}

View File

@ -0,0 +1,15 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dao
* @Author: jpz
* @CreateTime: 2023/12/28 18:54
*/
@Mapper
public interface DataProductionScheduleRecordDao extends BaseDao<DataProductionScheduleRecordEntity> {
}

View File

@ -0,0 +1,20 @@
package net.srt.disposition.query;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.common.query.Query;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.query
* @Author: jpz
* @CreateTime: 2023/12/28 9:39
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Schema(description = "数据生成-任务调度查询")
public class DataProductionsScheduleQuery extends Query {
private String name;
private Integer status;
}

View File

@ -0,0 +1,19 @@
package net.srt.disposition.service;
import com.baomidou.mybatisplus.extension.service.IService;
import net.srt.disposition.entity.DataCentre;
import net.srt.disposition.entity.DevelopmentOperationalRecordsQuery;
import net.srt.disposition.vo.DevelopmentTaskSaveVo;
import net.srt.framework.common.page.PageResult;
import java.util.List;
public interface DataCenterService extends IService<DataCentre> {
PageResult<DataCentre> dataCenterService(DevelopmentOperationalRecordsQuery query);
void deleted(List<Long> ids);
List<DevelopmentTaskSaveVo> listEnvList();
}

View File

@ -3,10 +3,14 @@ package net.srt.disposition.service;
import com.baomidou.mybatisplus.extension.service.IService;
import net.srt.disposition.dto.DataCheckSqlDto;
import net.srt.disposition.dto.DataSqlDto;
import net.srt.disposition.entity.DataCentre;
import net.srt.disposition.entity.DataCheckSqlEntity;
import net.srt.disposition.vo.DataCheckSqlVo;
import net.srt.disposition.vo.DataCheckVo;
import net.srt.disposition.vo.DataSqlVo;
import net.srt.disposition.vo.LogVo;
import net.srt.flink.common.result.SqlExplainResult;
import net.srt.framework.common.page.PageResult;
import java.util.List;
@ -16,4 +20,13 @@ public interface DataCheckSqlService extends IService<DataCheckSqlEntity> {
void add(DataCheckSqlDto dataCheckSqlDto);
List<SqlExplainResult> explainSql(DataSqlDto dataSqlDto);
DataCheckVo executeSql(DataSqlDto dataSqlDto);
LogVo getLog();
void clearLog();
}

View File

@ -0,0 +1,32 @@
package net.srt.disposition.service;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.BaseService;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service
* @Author: jpz
* @CreateTime: 2023/12/28 8:56
*/
public interface DataProductionScheduleService extends BaseService<DataProductionScheduleEntity> {
PageResult<DataProductionScheduleVo> page(DataProductionsScheduleQuery query);
Flow get(Long id);
void save(Flow flow);
String run(Integer id);
void delete(List<Long> idList);
void release(Integer id);
}

View File

@ -0,0 +1,17 @@
package net.srt.disposition.service;
import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity;
import net.srt.disposition.entity.DataProductionTaskEntity;
import net.srt.flink.core.job.JobResult;
import net.srt.framework.mybatis.service.BaseService;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service
* @Author: jpz
* @CreateTime: 2023/12/28 19:36
*/
public interface DataProductionTaskService extends BaseService<DataProductionTaskEntity> {
JobResult scheduleTask(DataProductionScheduleNodeRecordEntity nodeRecordEntity);
}

View File

@ -0,0 +1,68 @@
package net.srt.disposition.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.AllArgsConstructor;
import net.srt.disposition.convert.DataCentreConvert;
import net.srt.disposition.entity.DataCentre;
import net.srt.disposition.entity.DevelopmentOperationalRecordsQuery;
import net.srt.disposition.mapper.DataCentreMapper;
import net.srt.disposition.service.DataCenterService;
import net.srt.disposition.vo.DevelopmentTaskSaveVo;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@AllArgsConstructor
public class DataCenterServiceImpl extends BaseServiceImpl<DataCentreMapper, DataCentre> implements DataCenterService {
@Override
public PageResult<DataCentre> dataCenterService(DevelopmentOperationalRecordsQuery query) {
IPage<DataCentre> page=baseMapper.selectPage(getPage(query),getWrapper(query));
return new PageResult<>(DataCentreConvert.INSTANCE.convertList(page.getRecords()),page.getTotal());
}
@Override
public void deleted(List<Long> ids) {
removeByIds(ids);
for (Long id : ids) {
LambdaQueryWrapper<DataCentre> wrapper = Wrappers.lambdaQuery();
wrapper.eq(DataCentre::getId,id);
baseMapper.delete(wrapper);
}
}
@Override
public List<DevelopmentTaskSaveVo> listEnvList() {
LambdaQueryWrapper<DataCentre> wrapper = Wrappers.lambdaQuery();
List<DataCentre> entities = baseMapper.selectList(wrapper);
List<DevelopmentTaskSaveVo> developmentTaskSaveVos = DataCentreConvert.INSTANCE.convert(entities);
return developmentTaskSaveVos;
}
private LambdaQueryWrapper<DataCentre> getWrapper(DevelopmentOperationalRecordsQuery query) {
LambdaQueryWrapper<DataCentre> wrapper = Wrappers.lambdaQuery();
wrapper.eq(query.getNodeRecordId()!=null,DataCentre::getScheduleNodeRecordId,query.getNodeRecordId());
wrapper.eq(query.getTaskId()!=null,DataCentre::getTaskId,query.getTaskId());
wrapper.like(StringUtils.isNotBlank(query.getJobName()),DataCentre::getJobName,query.getJobName());
wrapper.eq(query.getStatus()!=null,DataCentre::getStatus,query.getStatus());
wrapper.eq(query.getInstanceStatus()!=null,DataCentre::getInstanceStatus,query.getInstanceStatus());
wrapper.eq(query.getDialect()!=null,DataCentre::getDialect,query.getDialect());
wrapper.eq(StringUtils.isNotBlank(query.getSqlDbType()),DataCentre::getSqlDbType,query.getSqlDbType());
wrapper.eq(query.getDatabaseId()!=null,DataCentre::getDatabaseId,query.getDatabaseId());
wrapper.eq(query.getClusterId()!=null,DataCentre::getClusterId,query.getClusterId());
wrapper.eq(query.getClusterConfigurationId()!=null,DataCentre::getClusterConfigurationId,query.getClusterConfigurationId());
wrapper.gt(query.getStartTime()!=null,DataCentre::getStartTime,query.getStartTime());
wrapper.lt(query.getEndTime()!=null,DataCentre::getEndTime,query.getEndTime());
wrapper.eq(query.getFinishTime()!=null,DataCentre::getFinishTime,query.getFinishTime());
return wrapper;
}
}

View File

@ -6,11 +6,17 @@ import lombok.AllArgsConstructor;
import net.srt.disposition.convert.DataCheckSqlConvert;
import net.srt.disposition.dto.DataCheckSqlDto;
import net.srt.disposition.dto.DataSqlDto;
import net.srt.disposition.entity.DataCentre;
import net.srt.disposition.entity.DataCheckSqlEntity;
import net.srt.disposition.entity.DataDatabaseDevEntity;
import net.srt.disposition.entity.DataProductionTreeEntity;
import net.srt.disposition.mapper.DataCentreMapper;
import net.srt.disposition.mapper.DataCheckSqlMapper;
import net.srt.disposition.mapper.DataProductionMapper;
import net.srt.disposition.service.DataCheckSqlService;
import net.srt.disposition.vo.DataCheckVo;
import net.srt.disposition.vo.LogVo;
import net.srt.disposition.vo.Result;
import net.srt.flink.common.result.SqlExplainResult;
import net.srt.flink.common.utils.LogUtil;
import net.srt.flink.process.context.ProcessContextHolder;
@ -18,20 +24,29 @@ import net.srt.flink.process.model.ProcessEntity;
import net.srt.flink.process.model.ProcessStatus;
import net.srt.flink.process.model.ProcessStep;
import net.srt.flink.process.model.ProcessType;
import net.srt.flink.process.pool.ConsolePool;
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
import net.srt.framework.security.cache.TokenStoreCache;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import javax.servlet.http.HttpServletRequest;
import java.sql.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
@AllArgsConstructor
public class DataCheckSqlServiceImpl extends BaseServiceImpl<DataCheckSqlMapper, DataCheckSqlEntity> implements DataCheckSqlService {
private DataProductionMapper dataProductionMapper;
private TokenStoreCache storeCache;
private DataCentreMapper dataCentreMapper;
private HttpServletRequest request;
@Override
public DataCheckSqlEntity find(Integer id) {
DataCheckSqlEntity dataCheckSqlEntity = baseMapper.selectById(id);
@ -91,6 +106,251 @@ public class DataCheckSqlServiceImpl extends BaseServiceImpl<DataCheckSqlMapper,
return sqlExplainResults;
}
@Override
public DataCheckVo executeSql(DataSqlDto dataSqlDto) {
DataCheckVo dataCheckVo=null;
try {
dataCheckVo = selectColumns(dataSqlDto);
dataCheckVo.setEndTime(new Date());
//构建运维数据
DataCentre operationalRecordsEntity= getDataCentre(dataCheckVo,dataSqlDto);
//添加到运维作业表中
dataCentreMapper.insert(operationalRecordsEntity);
} catch (Exception e) {
throw new RuntimeException(e);
}
return dataCheckVo;
}
private DataCentre getDataCentre(DataCheckVo exeCuteSql, DataSqlDto dto) {
DataCentre operationalRecords = new DataCentre();
//获取租户id\项目id
Long projectId = storeCache.getProjectId(getAccessToken1());
operationalRecords.setProjectId(projectId);
operationalRecords.setSqlDbType(dto.getSqlDbType());
operationalRecords.setDatabaseId(dto.getDatabaseId());
operationalRecords.setJobName(dto.getJobName());
operationalRecords.setStatement(dto.getStatement());
operationalRecords.setExecuteType(dto.getBatchModel());
Result result = exeCuteSql.getResult();
List<Result> results = result.getResults();
String Result= "";
for (Result resultsDTO : results) {
Result=resultsDTO.getRowData().toString();
}
operationalRecords.setResult(Result);
operationalRecords.setEndTime(new Date());
operationalRecords.setStartTime(new Date());
operationalRecords.setTaskId(dto.getId());
operationalRecords.setCreateTime(new Date());
operationalRecords.setExecuteNo(UUID.randomUUID().toString());
operationalRecords.setDuration(0);
return operationalRecords;
}
public String getAccessToken1() {
String accessToken = request.getHeader("Authorization");
if (StringUtils.isBlank(accessToken)) {
accessToken = request.getParameter("access_token");
}
return accessToken;
}
public DataCheckVo selectColumns(DataSqlDto dto) throws Exception{
//获取数据库信息
List<DataDatabaseDevEntity> databaseList = dto.getDatabaseList();
DataDatabaseDevEntity databaseVO = new DataDatabaseDevEntity();
for (DataDatabaseDevEntity dataDatabaseVO : databaseList) {
if(dto.getDatabaseId().equals(dataDatabaseVO.getId())){
databaseVO.setId(dataDatabaseVO.getId());
databaseVO.setDatabaseName(dataDatabaseVO.getDatabaseName());
databaseVO.setDatabaseIp(dataDatabaseVO.getDatabaseIp());
databaseVO.setDatabasePort(dataDatabaseVO.getDatabasePort());
databaseVO.setUserName(dataDatabaseVO.getUserName());
databaseVO.setPassword(dataDatabaseVO.getPassword());
databaseVO.setDatabaseType(dataDatabaseVO.getDatabaseType());
databaseVO.setJdbcUrl(dataDatabaseVO.getJdbcUrl());
databaseVO.setVersion(dataDatabaseVO.getVersion());
databaseVO.setCreateTime(dataDatabaseVO.getCreateTime());
databaseVO.setUpdateTime(dataDatabaseVO.getUpdateTime());
databaseVO.setDeleted(dataDatabaseVO.getDeleted());
databaseVO.setCreator(dataDatabaseVO.getCreator());
databaseVO.setUpdater(dataDatabaseVO.getUpdater());
}
}
//日志对象构建
ProcessEntity process= BuildStes(dto);
DataCheckVo exeCuteSql = new DataCheckVo();
List<SqlExplainResult> sqlExplainResults = new ArrayList<>();
String current = null;
try {
// 记录SQL验证开始的日志
process.info("Start execute sql...");
if(databaseVO.getDatabaseType().equals(ProductTypeEnum.MYSQL.getIndex())){
String className = ProductTypeEnum.MYSQL.getDriveClassName();
//1. 注册驱动
try {
Class.forName(className);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
// 获取数据库连接
try (Connection conn = DriverManager.getConnection(databaseVO.getJdbcUrl(), databaseVO.getUserName(), databaseVO.getPassword())) {
// 获取表名
String dtoStatement = dto.getStatement();
String tableName = getTableName(dtoStatement);
// 获取数据库操作对象
try (Statement statement = conn.createStatement()) {
String sql = "SHOW COLUMNS FROM " + tableName;
// 获取列名并存储在一个列表中
List<String> columnNames = new ArrayList<>();
try (ResultSet selectColumnNames = statement.executeQuery(sql)) {
while (selectColumnNames.next()) {
String columnName = selectColumnNames.getString("Field");
columnNames.add(columnName);
System.out.println("列名: " + columnName);
}
}
// 获取表中数据
try (ResultSet row = statement.executeQuery(dtoStatement)) {
// 创建集合用于存储查询结果
List<Map<String, Object>> resultList = new ArrayList<>();
// 处理查询结果
while (row.next()) {
// 创建一个 Map 用于存储每一行的数据
Map<String, Object> rowData = new HashMap<>();
// 遍历字段名列表,获取每个字段对应的值,并存储到 Map 中
for (String columnName : columnNames) {
Object value = row.getObject(columnName);
rowData.put(columnName, value);
}
// 将 Map 添加到集合中
resultList.add(rowData);
}
Result resultsDTO=BuildResultsDTO(columnNames,resultList,dto);
exeCuteSql.setResult(resultsDTO);
exeCuteSql.setSuccess(true);
}
// 使用SQLUtils将输入的SQL解析为SQL语句列表
List<SQLStatement> stmtList = SQLUtils.parseStatements(dto.getStatement(), "SqlServer");
// 遍历列表中的每个SQL语句
for (SQLStatement item : stmtList) {
// 设置当前正在处理的SQL语句以便记录日志
current = item.toString();
// 获取SQL语句的类型例如SELECT、INSERT并添加到结果列表中
String type = item.getClass().getSimpleName();
sqlExplainResults.add(SqlExplainResult.success(type, current, null));
}
process.info("Execute sql succeed.");
}
} catch (SQLException e) {
// 如果在SQL解析过程中发生异常将失败的结果添加到列表中
sqlExplainResults.add(SqlExplainResult.fail(current, LogUtil.getError(e)));
String error = LogUtil.getError(e);
// 记录错误消息
process.error(error);
}
}else{
throw new Exception("目前只支持Mysql类型");
}
process.infoEnd();
} catch (Exception e) {
// 如果在SQL解析过程中发生异常将失败的结果添加到列表中
sqlExplainResults.add(SqlExplainResult.fail(current, LogUtil.getError(e)));
String error = LogUtil.getError(e);
// 记录错误消息
process.error(error);
}
return exeCuteSql;
}
private Result BuildResultsDTO(List<String> columnNames, List<Map<String, Object>> resultList, DataSqlDto dto) {
long seconds = new Date().getSeconds();
Result resultsDTO = new Result();
String dtoStatement = dto.getStatement();
List<Result> result = new ArrayList<>();
Result resultsDTO1 = new Result();
resultsDTO1.setColumns(columnNames);
resultsDTO1.setRowData(resultList);
resultsDTO1.setCount(resultList.size());
resultsDTO1.setSql(dtoStatement);
resultsDTO1.setSuccess(true);
resultsDTO1.setJobId(dto.getId());
long take = new Date().getSeconds() - seconds;
resultsDTO1.setTime(take);
resultsDTO1.setIfQuery(true);
result.add(resultsDTO1);
resultsDTO.setResults(result);
resultsDTO.setCount(resultList.size());
resultsDTO.setSql(dtoStatement);
resultsDTO.setSuccess(true);
resultsDTO.setJobId(dto.getId());
long take1 = new Date().getSeconds() - seconds;
resultsDTO.setTime(take1);
return resultsDTO;
}
private String getTableName(String dtoStatement) {
// 使用正则表达式匹配表名
Pattern pattern = Pattern.compile("from\\s+([a-zA-Z_][a-zA-Z0-9_]*)", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(dtoStatement);
String tableName ="";
// 查找匹配
if (matcher.find()) {
// 获取匹配的表名
tableName = matcher.group(1);
System.out.println("Table Name: " + tableName);
} else {
System.out.println("Table name not found.");
}
return tableName;
}
@Override
public LogVo getLog() {
LogVo clearLog = new LogVo();
ConsolePool instance = ConsolePool.getInstance();
if(instance.exist(getAccessToken())){
clearLog.setLog(instance.get(getAccessToken()).toString());
clearLog.setEnd(true);
return clearLog;
}else{
return clearLog;
}
}
@Override
public void clearLog() {
ConsolePool instance = ConsolePool.getInstance();
instance.remove(getAccessToken());
}
private ProcessEntity BuildStes(DataSqlDto dto) {
// 从上下文获取当前进程实体
ProcessEntity process = ProcessContextHolder.getProcess();

View File

@ -0,0 +1,365 @@
package net.srt.disposition.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.AllArgsConstructor;
import net.srt.api.module.data.development.constant.ExecuteType;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import net.srt.api.module.quartz.QuartzDataProductionScheduleApi;
import net.srt.disposition.convert.DataProductionConvert;
import net.srt.disposition.mapper.DataProductionScheduleNodeDao;
import net.srt.disposition.mapper.DataProductionScheduleNodeRecordDao;
import net.srt.disposition.mapper.DataProductionScheduleRecordDao;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.dto.FlowEdge;
import net.srt.disposition.dto.FlowNode;
import net.srt.disposition.dto.FlowNodeProperties;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.disposition.entity.DataProductionScheduleNodeEntity;
import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.disposition.mapper.DataProductionScheduleDao;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.service.DataProductionScheduleService;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.flink.common.config.Dialect;
import net.srt.flink.common.utils.JSONUtil;
import net.srt.flink.common.utils.ThreadUtil;
import net.srt.flink.core.job.JobResult;
import net.srt.flink.process.context.ProcessContextHolder;
import net.srt.flink.process.model.ProcessEntity;
import net.srt.flink.process.model.ProcessType;
import net.srt.flink.process.pool.ConsolePool;
import net.srt.framework.common.exception.ServerException;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
import net.srt.framework.security.user.SecurityUser;
import org.apache.logging.log4j.core.util.CronExpression;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import srt.cloud.framework.dbswitch.common.util.StringUtil;
import java.util.*;
import java.util.stream.Collectors;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service.impl
* @Author: jpz
* @CreateTime: 2023/12/28 9:42
*/
@Service
@AllArgsConstructor
public class DataProductionScheduleServiceimpl extends BaseServiceImpl<DataProductionScheduleDao, DataProductionScheduleEntity> implements DataProductionScheduleService {
private final DataProductionScheduleNodeDao nodeDao;
private final DataProductionScheduleRecordDao recordDao;
private final DataProductionScheduleNodeRecordDao nodeRecordDao;
private final QuartzDataProductionScheduleApi scheduleApi;
@Override
public PageResult<DataProductionScheduleVo> page(DataProductionsScheduleQuery query) {
IPage<DataProductionScheduleEntity> page=baseMapper.selectPage(getPage(query),getWrapper(query));
return new PageResult<>(DataProductionConvert.INSTANCE.convertList(page.getRecords()),page.getTotal());
}
@Override
public Flow get(Long id) {
DataProductionScheduleEntity entity = baseMapper.selectById(id);
Flow flow = new Flow();
flow.setId(entity.getId());
flow.setName(entity.getName());
flow.setCron(entity.getCron());
flow.setStatus(entity.getStatus());
flow.setReleaseUserId(entity.getReleaseStatus());
flow.setReleaseTime(entity.getReleaseTime());
flow.setIfCycle(entity.getIfCycle());
flow.setNote(entity.getNote());
flow.setEdges(JSONUtil.parseObject(entity.getEdges(),new TypeReference<List<FlowEdge>>(){
}));
ArrayList<FlowNode> nodes = new ArrayList<>(6);
flow.setNodes(nodes);
//获取结点
HashMap<String, Object> queryMap = new HashMap<>();
queryMap.put("task_schedule_id",id);
List<DataProductionScheduleNodeEntity> dbNodes=nodeDao.selectByMap(queryMap);
for (DataProductionScheduleNodeEntity dbNode : dbNodes) {
FlowNode flowNode=getFlowNode(dbNode);
nodes.add(flowNode);
}
return flow;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void save(Flow flow) {
insertOrUpdate(flow);
}
@Override
public String run(Integer id) {
return scheduleRun(id, ExecuteType.HAND);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void delete(List<Long> idList) {
removeByIds(idList);
for (Long id : idList) {
//同步删除节点
Map<String, Object> delMap = new HashMap<>();
delMap.put("task_schedule_id", id);
nodeDao.deleteByMap(delMap);
}
}
@Override
public void release(Integer id) {
scheduleApi.release(id.longValue());
//更新状态,发布人和发布时间
DataProductionScheduleEntity dbEntity = new DataProductionScheduleEntity();
dbEntity.setId(id.longValue());
dbEntity.setStatus(1);
dbEntity.setReleaseUserId(SecurityUser.getUserId().intValue());
dbEntity.setReleaseTime(new Date());
baseMapper.changeStutus(dbEntity);
}
private String scheduleRun(Integer id, ExecuteType executeType) {
DataProductionScheduleEntity scheduleEntity=baseMapper.selectById(id);
LambdaQueryWrapper<DataProductionScheduleNodeEntity> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(DataProductionScheduleNodeEntity::getTaskScheduleId,id)
.orderByAsc(DataProductionScheduleNodeEntity::getSort);
List<DataProductionScheduleNodeEntity> dbNodes=nodeDao.selectList(wrapper);
//新增调度日志
DataProductionScheduleRecordEntity scheduleRecordEntity = new DataProductionScheduleRecordEntity();
scheduleRecordEntity.setProjectId(scheduleRecordEntity.getProjectId());
scheduleRecordEntity.setName(scheduleEntity.getName());
scheduleRecordEntity.setTaskScheduleId(id);
scheduleRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode());
scheduleRecordEntity.setStartTime(new Date());
scheduleRecordEntity.setExecuteType(executeType.getValue());
recordDao.insert(scheduleRecordEntity);
ThreadUtil.threadPool.execute(()->runNode(scheduleEntity,executeType,dbNodes,scheduleRecordEntity));
return scheduleEntity.getId().toString();
}
private void runNode(DataProductionScheduleEntity scheduleEntity, ExecuteType executeType, List<DataProductionScheduleNodeEntity> dbNodes, DataProductionScheduleRecordEntity scheduleRecordEntity) {
String processId=scheduleRecordEntity.getId()+DataProductionScheduleRecordEntity.SCHEDULE_RECORD;
ProcessEntity flowProcess= ProcessContextHolder.registerFlowProcess(ProcessEntity.init(ProcessType.FLINKEXECUTE,processId));
flowProcess.info("Start run flow...");
boolean recordSuccess = true;
List<FlowNode> flowNodes = dbNodes.stream().map(this::getFlowNode).collect(Collectors.toList());
for (DataProductionScheduleNodeEntity dbNode : dbNodes) {
//返回给前台的节点
FlowNode flowNode = flowNodes.stream().filter(item -> item.getId().equals(dbNode.getNo())).findFirst().get();
flowNode.getProperties().setRunStatus(CommonRunStatus.SUCCESS.getCode());
flowNode.getProperties().setStyle(FlowNodeProperties.SUCCESS_STYLE);
flowProcess.info(String.format("Start run node %s-%s", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue()));
DataProductionScheduleNodeRecordEntity nodeRecordEntity = new DataProductionScheduleNodeRecordEntity();
nodeRecordEntity.setProjectId(scheduleRecordEntity.getProjectId());
nodeRecordEntity.setTaskScheduleId(scheduleEntity.getId().intValue());
nodeRecordEntity.setScheduleNodeId(dbNode.getId().intValue());
nodeRecordEntity.setScheduleNodeNo(dbNode.getNo());
nodeRecordEntity.setScheduleRecordId(scheduleRecordEntity.getId().intValue());
nodeRecordEntity.setTaskId(dbNode.getTaskId());
nodeRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode());
nodeRecordEntity.setStartTime(new Date());
nodeRecordEntity.setExecuteType(executeType.getValue());
nodeRecordDao.insert(nodeRecordEntity);
flowNode.getProperties().setNodeRecordId(nodeRecordEntity.getId().intValue());
JobResult jobResult = null;
if (jobResult != null) {
flowProcess.info(jobResult.getLog());
}
flowProcess.info(String.format("Node %s-%s run end", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue()));
}
//更新调度记录
scheduleRecordEntity.setEndTime(new Date());
scheduleRecordEntity.setRunStatus(recordSuccess ? CommonRunStatus.SUCCESS.getCode() : CommonRunStatus.FAILED.getCode());
scheduleRecordEntity.setLog(ConsolePool.getInstance().get(processId).toString());
//保存历史json
Flow flow = new Flow();
flow.setId(scheduleEntity.getId());
flow.setIfCycle(scheduleEntity.getIfCycle());
flow.setRecordId(scheduleRecordEntity.getId().intValue());
flow.setName(scheduleEntity.getName());
flow.setNote(scheduleEntity.getNote());
flow.setCron(scheduleEntity.getCron());
flow.setNodes(flowNodes);
flow.setEdges(JSONUtil.parseObject(scheduleEntity.getEdges(), new TypeReference<List<FlowEdge>>() {
}));
scheduleRecordEntity.setConfigJson(JSONUtil.toJsonString(flow));
recordDao.updateById(scheduleRecordEntity);
flowProcess.infoEnd();
//移除日志
ProcessContextHolder.clearFlow();
ConsolePool.getInstance().remove(processId);
}
private void insertOrUpdate(Flow flow) {
if (flow.getIfCycle()==1&&!CronExpression.isValidExpression(flow.getCron())){
throw new ServerException("cron表达式有误,请检查后重新填写");
}
DataProductionScheduleEntity entity=DataProductionScheduleEntity.builder().id(flow.getId())
.projectId(getProjectId()).ifCycle(flow.getIfCycle())
.name(flow.getName()).cron(flow.getCron()).note(flow.getNote()).status(flow.getStatus()).releaseTime(flow.getReleaseTime()).releaseUserId(flow.getReleaseUserId()).build();
List<FlowNode> nodes=flow.getNodes();
List<FlowEdge> edges=flow.getEdges();
//寻找入度为0节点
List<FlowNode> starNodes=getStarNodes(nodes,edges);
//检查闭环
checkClosedLoop(starNodes,nodes,edges);
Set<FlowNode> runNodeSet=new LinkedHashSet<>();
buildRunNodes(runNodeSet,starNodes,nodes,edges);
if (entity.getId()==null){
entity.setEdges(JSONUtil.toJsonString(edges));
baseMapper.insert(entity);
//转换前端传过来节点为entity
List<DataProductionScheduleNodeEntity> clientNodes=getNodesByNodeSet(entity,runNodeSet);
//新增节点
clientNodes.forEach(nodeDao::insert);
}else {
List<DataProductionScheduleNodeEntity> clientNodes=getNodesByNodeSet(entity,runNodeSet);
entity.setEdges(JSONUtil.toJsonString(edges));
baseMapper.updateById(entity);
//获取库中节点
HashMap<String, Object> queryMap = new HashMap<>();
queryMap.put("task-schedule_id",entity.getId());
List<DataProductionScheduleNodeEntity> dbNode=nodeDao.selectByMap(queryMap);
//查询clientNodes的properties的id
List<DataProductionScheduleNodeEntity> insertNodes=clientNodes.stream().filter(item->item.getId()==null).collect(Collectors.toList());
insertNodes.forEach(nodeDao::insert);
//查询dbNode的properties的id不为空
clientNodes=getNodesByNodeSet(entity,runNodeSet);
List<DataProductionScheduleNodeEntity> updateNodes=clientNodes.stream().filter(item->item.getId()!=null).collect(Collectors.toList());
updateNodes.forEach(nodeDao::updateById);
//查询库里有,nodeset里边有没有,则是要进行删除
for (DataProductionScheduleNodeEntity dbNodes : dbNode) {
if(clientNodes.stream().noneMatch(item->dbNodes.getNo().equals(item.getNo()))){
nodeDao.deleteById(dbNodes.getId());
}
}
}
}
private List<DataProductionScheduleNodeEntity> getNodesByNodeSet(DataProductionScheduleEntity entity, Set<FlowNode> runNodeSet) {
List<DataProductionScheduleNodeEntity> clientNodes=new ArrayList<>(10);
int i=0;
for (FlowNode flowNode : runNodeSet) {
i++;
DataProductionScheduleNodeEntity nodeentity = new DataProductionScheduleNodeEntity();
nodeentity.setId(flowNode.getProperties().getId());
nodeentity.setTaskScheduleId(entity.getProjectId());
nodeentity.setProjectId(entity.getProjectId());
nodeentity.setNo(flowNode.getId());
nodeentity.setSort(i);
nodeentity.setName(flowNode.getProperties().getName());
nodeentity.setType(flowNode.getType());
nodeentity.setX(flowNode.getX());
nodeentity.setY(flowNode.getY());
nodeentity.setNote(flowNode.getProperties().getNote());
nodeentity.setTaskId(flowNode.getProperties().getTaskId());
nodeentity.setTaskType(flowNode.getProperties().getTaskType());
nodeentity.setFailGoOn(flowNode.getProperties().getFailGoOn());
nodeentity.setWeight(flowNode.getProperties().getWeight());
clientNodes.add(nodeentity);
}
return clientNodes;
}
private void buildRunNodes(Set<FlowNode> runNodeSet, List<FlowNode> starNodes, List<FlowNode> nodes, List<FlowEdge> edges) {
if (starNodes.isEmpty()){
return;
}
//按照权重逆序,权重越高越在前面
starNodes.sort((item1, item2)->item2.getProperties().getWeight().compareTo(item1.getProperties().getWeight()));
for (FlowNode starNode : starNodes) {
if (nodes.contains(starNode)){
runNodeSet.remove(starNode);
}
runNodeSet.add(starNode);
}
//获取子节点
ArrayList<FlowNode> childNodes = new ArrayList<>(2);
for (FlowNode starNode : starNodes) {
//获取以node为父节点
List<FlowEdge> collect=edges.stream().filter(item->starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList());
for (FlowEdge flowEdge : collect) {
FlowNode flowNode=nodes.stream().filter(item->item.getId().equals(flowEdge.getTargetNodeId())).findFirst().get();
childNodes.add(flowNode);
}
}
buildRunNodes(runNodeSet,childNodes,nodes,edges);
}
private void checkClosedLoop(List<FlowNode> starNodes, List<FlowNode> nodes, List<FlowEdge> edges) {
if (starNodes.isEmpty()){
throw new ServerException("流程不允许存在闭环,请检查");
}
for (FlowNode starNode : starNodes) {
Set<FlowNode> nodeSet = new HashSet<>();
//遍历检查闭环
dfs(nodeSet,starNode,nodes,edges);
}
}
private void dfs(Set<FlowNode> nodeSet, FlowNode starNode, List<FlowNode> nodes, List<FlowEdge> edges) {
List<FlowEdge> collect=edges.stream().filter(item-> starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList());
if (collect.isEmpty()){
return;
}
for (FlowEdge edge : collect) {
FlowNode targetNode=nodes.stream().filter(item->item.getId().equals(edge.getTargetNodeId())).findFirst().get();
if (nodeSet.contains(targetNode)){
throw new ServerException("流程不允许存在闭环,请检查");
}
nodeSet.add(targetNode);
dfs(nodeSet,targetNode,nodes,edges);
}
}
private List<FlowNode> getStarNodes(List<FlowNode> nodes, List<FlowEdge> edges) {
if (nodes.isEmpty()){
throw new ServerException("流程不能为空");
}
ArrayList<FlowNode> startNode = new ArrayList<>(1);
for (FlowNode node : nodes) {
if (edges.stream().noneMatch(item->node.getId().equals(item.getTargetNodeId()))){
startNode.add(node);
}
}
return startNode;
}
private FlowNode getFlowNode(DataProductionScheduleNodeEntity dbNode) {
FlowNode flowNode = new FlowNode();
flowNode.setId(dbNode.getNo());
flowNode.setType(dbNode.getType());
flowNode.setX(dbNode.getX());
flowNode.setY(dbNode.getY());
FlowNodeProperties properties = new FlowNodeProperties();
properties.setId(dbNode.getId());
properties.setName(dbNode.getName());
properties.setTaskId(dbNode.getTaskId());
properties.setWeight(dbNode.getWeight());
properties.setTaskType(dbNode.getTaskType());
properties.setTaskTypeVal(Dialect.getByCode(dbNode.getTaskType().toString()).getValue());
properties.setNote(dbNode.getNote());
properties.setFailGoOn(dbNode.getFailGoOn());
flowNode.setProperties(properties);
return flowNode;
}
private LambdaQueryWrapper<DataProductionScheduleEntity> getWrapper(DataProductionsScheduleQuery query) {
LambdaQueryWrapper<DataProductionScheduleEntity> wrapper = Wrappers.lambdaQuery();
wrapper.like(StringUtil.isNotBlank(query.getName()),DataProductionScheduleEntity::getName,query.getName())
.eq(query.getStatus()!=null,DataProductionScheduleEntity::getStatus,query.getStatus())
.orderByDesc(DataProductionScheduleEntity::getUpdateTime).orderByDesc(DataProductionScheduleEntity::getId);
dataScopeWithOrgId(wrapper);
return wrapper;
}
}

View File

@ -0,0 +1,28 @@
package net.srt.disposition.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@Data
public class DataCheckVo {
private Integer id;
private String jobConfig;
private String jobManagerAddress;
private Integer status;
private boolean success;
private String statement;
private Integer jobId;
private Integer jobInstanceId;
private String error;
private Result result;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
private String log;
}

View File

@ -0,0 +1,69 @@
package net.srt.disposition.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import net.srt.framework.common.utils.DateUtils;
import java.io.Serializable;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.vo
* @Author: jpz
* @CreateTime: 2023/12/28 8:57
*/
@Data
@Schema(description = "数据生产-作业调度")
public class DataProductionScheduleVo implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "主键")
private Long id;
private Long projectId;
@Schema(description = "调度名称")
private String name;
@Schema(description = "是否周期执行")
private Integer isCycle;
@Schema(description = "cron表达式")
private String cron;
@Schema(description = "描述")
private String note;
@Schema(description = "节点关系json")
private String edges;
@Schema(description = "0-未发布 1-已发布")
private Integer status;
private Date releaseTime;
private Integer releaseUserId;
@Schema(description = "版本号")
private Integer version;
@Schema(description = "删除标识 0:正常 1:已删除")
private Integer deleted;
@Schema(description = "创建者")
private Long creator;
@Schema(description = "创建时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date createTime;
@Schema(description = "更新者")
private Long updater;
@Schema(description = "更新时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date updateTime;
}

View File

@ -0,0 +1,141 @@
package net.srt.disposition.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
/**
* @author : WangZhanpeng
* @date : 2023/12/27 19:14
*/
@Data
@Schema(description = "数据开发-运维中心")
public class DevelopmentOperationalRecordsVo {
@Schema(description = "主键")
private Long id;
@Schema(description = "所属项目唯一标识符")
private Long projectId;
@Schema(description = "SQL数据库类型")
private String sqlDbType;
@Schema(description = "数据库唯一标识符")
private Long databaseId;
@Schema(description = "集群唯一标识符")
private Integer clusterId;
@Schema(description = "集群配置唯一标识符")
private Long clusterConfigurationId;
@Schema(description = "会话信息")
private String session;
@Schema(description = "作业唯一标识符")
private Long jobId;
@Schema(description = "作业名称")
private String jobName;
@Schema(description = "作业管理器地址")
private String jobManagerAddress;
@Schema(description = "执行状态")
private Integer status;
@Schema(description = "方言")
private Integer dialect;
@Schema(description = "类型")
private String type;
@Schema(description = "SQL语句")
private String statement;
@Schema(description = "错误信息")
private String error;
@Schema(description = "执行结果")
private String result;
@Schema(description = "配置JSON字符串")
private String configJson;
@Schema(description = "开始时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
@Schema(description = "结束时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
@Schema(description = "任务唯一标识符")
private Long taskId;
@Schema(description = "执行类型")
private String executeType;
@Schema(description = "调度唯一标识符")
private Long scheduleId;
@Schema(description = "调度节点唯一标识符")
private Long scheduleNodeId;
@Schema(description = "调度记录唯一标识符")
private Long scheduleRecordId;
@Schema(description = "调度节点记录唯一标识符")
private Long scheduleNodeRecordId;
@Schema(description = "实例状态")
private String instanceStatus;
@Schema(description = "执行完成时间戳")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date finishTime;
@Schema(description = "执行的SQL语句")
private String executeSql;
@Schema(description = "执行编号")
private String executeNo;
@Schema(description = "作业标识符")
private String jid;
@Schema(description = "执行持续时间")
private Integer duration;
@Schema(description = "实体的版本")
private Integer version;
@Schema(description = "表示实体是否已删除的标志")
private Integer deleted;
@Schema(description = "创建者的ID")
private Integer creator;
@Schema(description = "实体的创建时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date createTime;
@Schema(description = "更新者的ID")
private Integer updater;
@Schema(description = "实体的更新时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
}

View File

@ -0,0 +1,143 @@
package net.srt.disposition.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import net.srt.framework.common.utils.DateUtils;
import java.io.Serializable;
import java.util.Date;
/**
* @author : WangZhanpeng
* @date : 2023/12/24 19:59
*/
@Data
@Schema(description = "数据开发-sql作业")
public class DevelopmentTaskSaveVo implements Serializable {
@Schema(description = "唯一标识")
private Long id;
@Schema(description = "警报组标识")
private Long alertGroupId;
@Schema(description = "别名")
private String alias;
@Schema(description = "批处理模型")
private String batchModel;
@Schema(description = "目录标识")
private Long catalogueId;
@Schema(description = "检查点")
private String checkPoint;
@Schema(description = "集群配置标识")
private Long clusterConfigurationId;
@Schema(description = "集群标识")
private Long clusterId;
@Schema(description = "配置 JSON")
private String configJson;
@Schema(description = "数据库标识")
private Long databaseId;
@Schema(description = "方言")
private String dialect;
@Schema(description = "是否启用")
private String enabled;
@Schema(description = "环境标识")
private String envId;
@Schema(description = "片段")
private String fragment;
@Schema(description = "JAR标识")
private Long jarId;
@Schema(description = "作业实例标识")
private Long jobInstanceId;
@Schema(description = "名称")
private String name;
@Schema(description = "备注")
private String note;
@Schema(description = "开放传输")
private String openTrans;
@Schema(description = "并行度")
private Integer parallelism;
@Schema(description = "处理结束时间")
private String processEnd;
@Schema(description = "项目标识")
private Long projectId;
@Schema(description = "PV数据数量")
private Integer pvdataNum;
@Schema(description = "保存点路径")
private String savePointPath;
@Schema(description = "保存点策略")
private String savePointStrategy;
@Schema(description = "SQL数据库类型")
private String sqlDbType;
@Schema(description = "语句")
private String statement;
@Schema(description = "语句集")
private String statementSet;
@Schema(description = "步骤")
private String step;
@Schema(description = "类型")
private String type;
@Schema(description = "是否使用自动取消")
private String useAutoCancel;
@Schema(description = "是否使用变更日志")
private String useChangeLog;
@Schema(description = "是否使用结果")
private String useResult;
@Schema(description = "版本标识")
private Long versionId;
@Schema(description = "版本")
private Integer version;
@Schema(description = "已删除")
private String deleted;
@Schema(description = "创建者")
private Integer creator;
@Schema(description = "创建时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date createTime;
@Schema(description = "更新者")
private Integer updater;
@Schema(description = "更新时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date updateTime;
}

View File

@ -0,0 +1,9 @@
package net.srt.disposition.vo;
import lombok.Data;
@Data
public class LogVo {
private String log;
private boolean end;
}

View File

@ -0,0 +1,23 @@
package net.srt.disposition.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Data
public class Result {
private List<Result> results;
private boolean ifQuery;
private String sql;
private Long time;
private boolean success;
private String errorMsg;
private Integer count;
private List<String> columns;
private List<Map<String, Object>> rowData;
private Long jobId;
}