jpz最新3.0

dev
jpz 2023-12-28 22:30:42 +08:00
parent 079bf4a08e
commit aa2230ee46
34 changed files with 1274 additions and 17 deletions

View File

@ -15,7 +15,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0
config:

View File

@ -15,7 +15,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0
config:

View File

@ -109,6 +109,9 @@
<groupId>net.srt</groupId>
<artifactId>flink-function</artifactId>
</dependency>
</dependencies>
<profiles>

View File

@ -84,18 +84,12 @@ spring:
- Path=/srt-cloud-datax-service/** # Adjust the path as needed
filters:
- StripPrefix=1
- id: srt-data-development # New Gateway
uri: lb://srt-data-development # Update with the correct URI for your new service
predicates:
- Path=/data-development/** # Adjust the path as needed
filters:
- StripPrefix=1
nacos:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0

View File

@ -11,7 +11,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0
config:

View File

@ -11,7 +11,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0
config:

View File

@ -11,7 +11,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0
config:

View File

@ -11,7 +11,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0
config:

View File

@ -14,7 +14,7 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: 7e34f104-f333-4828-b36a-02146e521c9a
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.0
config:

View File

@ -79,6 +79,12 @@
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
</dependency>
<dependency>
<groupId>net.srt</groupId>
<artifactId>flink-core-all</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,79 @@
package net.srt.disposition.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.service.DataProductionScheduleService;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.common.utils.Result;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.parameters.P;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.controller
* @Author: jpz
* @CreateTime: 2023/12/28 8:49
*/
@RestController
@RequestMapping("schedule")
@Tag(name = "数据生产-作业调度")
@AllArgsConstructor
public class DataProductionScheduleController {
private final DataProductionScheduleService dataProductionScheduleService;
@GetMapping("page")
@Operation(summary = "分页")
@PreAuthorize("hasAuthority('data-development:schedule:page')")
public Result<PageResult<DataProductionScheduleVo>> page(@Valid DataProductionsScheduleQuery query){
PageResult<DataProductionScheduleVo> pageResult = dataProductionScheduleService.page(query);
return Result.ok(pageResult);
}
@GetMapping("{id}")
@Operation(summary = "信息")
@PreAuthorize("hasAuthority('data-development:schedule:info')")
public Result<Flow> get(@PathVariable("id") Long id){
return Result.ok(dataProductionScheduleService.get(id));
}
@PostMapping
@Operation(summary = "保存")
@PreAuthorize("hasAuthority('data-development:schedule:save')")
public Result<String> save(@RequestBody Flow flow){
dataProductionScheduleService.save(flow);
return Result.ok();
}
@PostMapping("/run/{id}")
@Operation(summary = "执行(返回log的id)")
@PreAuthorize("hasAuthority('data-development:schedule:run')")
public Result<String> run(@PathVariable Integer id){
return Result.ok(dataProductionScheduleService.run(id));
}
@DeleteMapping
@Operation(summary = "删除")
@PreAuthorize("hasAuthority('data-development:schedule:delete')")
public Result<String> delete(@RequestBody List<Long> idList){
dataProductionScheduleService.delete(idList);
return Result.ok();
}
@PostMapping("/release/{id}")
@Operation(summary = "发布")
@PreAuthorize("hasAuthority('data-development:schedule:release')")
public Result<String> release(@PathVariable Integer id) {
dataProductionScheduleService.release(id);
return Result.ok();
}
}

View File

@ -0,0 +1,22 @@
package net.srt.disposition.convert;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.disposition.vo.DataProductionScheduleVo;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.convert
* @Author: jpz
* @CreateTime: 2023/12/28 9:56
*/
@Mapper
public interface DataProductionConvert {
DataProductionConvert INSTANCE = Mappers.getMapper(DataProductionConvert.class);
List<DataProductionScheduleVo> convertList(List<DataProductionScheduleEntity> list);
}

View File

@ -0,0 +1,31 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Flow {
private Long id;
private Integer recordId;
private Integer ifCycle;
private String name;
private String cron;
private String note;
private Integer status;
private Date releaseTime;
private Integer releaseUserId;
private List<FlowNode> nodes;
private List<FlowEdge> edges;
}

View File

@ -0,0 +1,28 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowEdge {
private String id;
private String type;
private Map<String, Object> startPoint;
private Map<String, Object> endPoint;
private List<Map<String, Object>> pointsList;
private String sourceNodeId;
private String targetNodeId;
private Map<String, Object> properties;
}

View File

@ -0,0 +1,22 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowNode {
private String id;
private String type;
private Integer x;
private Integer y;
private FlowNodeProperties properties;
}

View File

@ -0,0 +1,37 @@
package net.srt.disposition.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/28 14:05
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowNodeProperties {
public static final Map<String, Object> SUCCESS_STYLE = new HashMap<String, Object>() {
{
put("border","3px solid #06c733");
}
};
private Long id;
private Integer nodeRecordId;
private String name;
private Integer taskId;
private Integer weight;
private Integer taskType;
private String taskTypeVal;
private String note;
private Integer failGoOn;
private Map<String, Object> style;
//CommonRunStatus
private Integer runStatus = 1;
}

View File

@ -0,0 +1,38 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 9:35
*/
@EqualsAndHashCode(callSuper = false)
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "data_production_schedule",autoResultMap = true)
public class DataProductionScheduleEntity extends BaseEntity {
private Long projectId;
private String name;
private Integer ifCycle;
private String cron;
private String note;
private Integer status;
private Date releaseTime;
private Integer releaseStatus;
private String edges;
private Integer releaseUserId;
}

View File

@ -0,0 +1,76 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import net.srt.framework.mybatis.entity.BaseEntity;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 15:01
*/
@Data
@TableName("data_production_schedule_node")
public class DataProductionScheduleNodeEntity extends BaseEntity {
private Long projectId;
/**
* id
*/
private Long taskScheduleId;
/**
*
*/
private String no;
/**
*
*/
private Integer sort;
/**
*
*/
private String name;
/**
*
*/
private String type;
/**
*
*/
private Integer x;
/**
*
*/
private Integer y;
/**
*
*/
private String note;
/**
* id
*/
private Integer taskId;
/**
*
*/
private Integer taskType;
/**
*
*/
private Integer failGoOn;
/**
*
*/
private Integer weight;
}

View File

@ -0,0 +1,71 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 19:06
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("data_production_schedule_node_record")
public class DataProductionScheduleNodeRecordEntity extends BaseEntity {
public static final String SCHEDULE_NODE_RECORD = "SCHEDULE_NODE_RECORD";
/**
* id
*/
private Integer taskScheduleId;
/**
* id
*/
private Integer scheduleNodeId;
/**
* id
*/
private Integer scheduleRecordId;
private String scheduleNodeNo;
/**
* id
*/
private Integer taskId;
/**
* id
*/
private Long projectId;
/**
* run_status
*/
private Integer runStatus;
/**
*
*/
private Date startTime;
/**
*
*/
private Date endTime;
/**
*
*/
private String log;
private Integer executeType;
}

View File

@ -0,0 +1,58 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 17:26
*/
@EqualsAndHashCode(callSuper = false)
@Data
@TableName("data_production_schedule_record")
public class DataProductionScheduleRecordEntity extends BaseEntity {
public static final String SCHEDULE_RECORD = "SCHEDULE_RECORD";
//程序异常中断重新执行的情况
public static final String SCHEDULE_RESTART_RECORD = "SCHEDULE_RESTART_RECORD";
private String name;
/**
* id
*/
private Integer taskScheduleId;
/**
* id
*/
private Long projectId;
/**
* run_status
*/
private Integer runStatus;
/**
*
*/
private Date startTime;
/**
*
*/
private Date endTime;
/**
*
*/
private String log;
private Integer executeType;
private String configJson;
}

View File

@ -0,0 +1,227 @@
package net.srt.disposition.entity;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.flink.common.assertion.Asserts;
import net.srt.flink.core.job.JobConfig;
import net.srt.flink.gateway.GatewayType;
import net.srt.framework.mybatis.entity.BaseEntity;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.entity
* @Author: jpz
* @CreateTime: 2023/12/28 19:36
*/
@EqualsAndHashCode(callSuper = false)
@Data
@TableName(value = "data_production_task", autoResultMap = true)
public class DataProductionTaskEntity extends BaseEntity {
/**
* id
*/
private Long catalogueId;
/**
*
*/
private String name;
/**
* id
*/
private Long projectId;
/**
*
*/
private String alias;
/**
*
*/
private Integer dialect;
/**
*
*/
private Integer type;
/**
* CheckPoint trigger seconds
*/
private Integer checkPoint;
/**
* SavePoint strategy
*/
private Integer savePointStrategy;
/**
* SavePointPath
*/
private String savePointPath;
/**
*
*/
private Integer parallelism;
/**
*
*/
private Boolean fragment;
/**
* insert
*/
private Boolean statementSet;
/**
*
*/
private Boolean batchModel;
/**
* flinkid
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long clusterId;
/**
* id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long clusterConfigurationId;
/**
* 1- 2-sql
*/
private Integer sqlDbType;
/**
* idsql
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long databaseId;
private Integer openTrans;
/**
* Jar ID
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long jarId;
/**
* env id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long envId;
/**
* alert group id
*/
private Long alertGroupId;
/**
* configuration json
*/
private String configJson;
/**
* Job Note
*/
private String note;
/**
* Job lifecycle
*/
private Integer step;
/**
* job instance id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long jobInstanceId;
/**
*
*/
private Boolean useAutoCancel;
/**
*
*/
private Boolean useChangeLog;
/**
*
*/
private Boolean useResult;
/**
*
*/
private Integer pvdataNum;
/**
* is enable
*/
private Boolean enabled;
/**
* version id
*/
private Integer versionId;
@TableField(exist = false)
private String statement;
@TableField(exist = false)
private String clusterName;
@TableField(exist = false)
private List<Map<String, String>> config = new ArrayList<>();
public List<Map<String, String>> parseConfig() {
ObjectMapper objectMapper = new ObjectMapper();
try {
if (Asserts.isNotNullString(configJson)) {
config = objectMapper.readValue(configJson, ArrayList.class);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return config;
}
public JobConfig buildSubmitConfig() {
boolean useRemote = true;
if (clusterId == null || clusterId == 0) {
useRemote = false;
}
Map<String, String> map = new HashMap<>();
for (Map<String, String> item : config) {
if (Asserts.isNotNull(item)) {
map.put(item.get("key"), item.get("value"));
}
}
int jid = Asserts.isNull(jarId) ? 0 : jarId.intValue();
boolean fg = Asserts.isNull(fragment) ? false : fragment;
boolean sts = Asserts.isNull(statementSet) ? false : statementSet;
return new JobConfig(GatewayType.getByCode(type.toString()).getLongValue(), step, false, false, useRemote, clusterId == null ? null : clusterId.intValue(), clusterConfigurationId == null ? null : clusterConfigurationId.intValue(), jid, getId().intValue(),
alias, fg, sts, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map);
}
}

View File

@ -0,0 +1,17 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.mapper
* @Author: jpz
* @CreateTime: 2023/12/28 9:45
*/
@Mapper
public interface DataProductionScheduleDao extends BaseDao<DataProductionScheduleEntity> {
void changeStutus(DataProductionScheduleEntity dbEntity);
}

View File

@ -0,0 +1,15 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleNodeEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dao
* @Author: jpz
* @CreateTime: 2023/12/28 15:00
*/
@Mapper
public interface DataProductionScheduleNodeDao extends BaseDao<DataProductionScheduleNodeEntity> {
}

View File

@ -0,0 +1,15 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dao
* @Author: jpz
* @CreateTime: 2023/12/28 19:08
*/
@Mapper
public interface DataProductionScheduleNodeRecordDao extends BaseDao<DataProductionScheduleNodeRecordEntity> {
}

View File

@ -0,0 +1,15 @@
package net.srt.disposition.mapper;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.framework.mybatis.dao.BaseDao;
import org.apache.ibatis.annotations.Mapper;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dao
* @Author: jpz
* @CreateTime: 2023/12/28 18:54
*/
@Mapper
public interface DataProductionScheduleRecordDao extends BaseDao<DataProductionScheduleRecordEntity> {
}

View File

@ -0,0 +1,20 @@
package net.srt.disposition.query;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.common.query.Query;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.query
* @Author: jpz
* @CreateTime: 2023/12/28 9:39
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Schema(description = "数据生成-任务调度查询")
public class DataProductionsScheduleQuery extends Query {
private String name;
private Integer status;
}

View File

@ -0,0 +1,32 @@
package net.srt.disposition.service;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.BaseService;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service
* @Author: jpz
* @CreateTime: 2023/12/28 8:56
*/
public interface DataProductionScheduleService extends BaseService<DataProductionScheduleEntity> {
PageResult<DataProductionScheduleVo> page(DataProductionsScheduleQuery query);
Flow get(Long id);
void save(Flow flow);
String run(Integer id);
void delete(List<Long> idList);
void release(Integer id);
}

View File

@ -0,0 +1,17 @@
package net.srt.disposition.service;
import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity;
import net.srt.disposition.entity.DataProductionTaskEntity;
import net.srt.flink.core.job.JobResult;
import net.srt.framework.mybatis.service.BaseService;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service
* @Author: jpz
* @CreateTime: 2023/12/28 19:36
*/
public interface DataProductionTaskService extends BaseService<DataProductionTaskEntity> {
JobResult scheduleTask(DataProductionScheduleNodeRecordEntity nodeRecordEntity);
}

View File

@ -0,0 +1,365 @@
package net.srt.disposition.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.AllArgsConstructor;
import net.srt.api.module.data.development.constant.ExecuteType;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import net.srt.api.module.quartz.QuartzDataProductionScheduleApi;
import net.srt.disposition.convert.DataProductionConvert;
import net.srt.disposition.mapper.DataProductionScheduleNodeDao;
import net.srt.disposition.mapper.DataProductionScheduleNodeRecordDao;
import net.srt.disposition.mapper.DataProductionScheduleRecordDao;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.dto.FlowEdge;
import net.srt.disposition.dto.FlowNode;
import net.srt.disposition.dto.FlowNodeProperties;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.disposition.entity.DataProductionScheduleNodeEntity;
import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.disposition.mapper.DataProductionScheduleDao;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.service.DataProductionScheduleService;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.flink.common.config.Dialect;
import net.srt.flink.common.utils.JSONUtil;
import net.srt.flink.common.utils.ThreadUtil;
import net.srt.flink.core.job.JobResult;
import net.srt.flink.process.context.ProcessContextHolder;
import net.srt.flink.process.model.ProcessEntity;
import net.srt.flink.process.model.ProcessType;
import net.srt.flink.process.pool.ConsolePool;
import net.srt.framework.common.exception.ServerException;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
import net.srt.framework.security.user.SecurityUser;
import org.apache.logging.log4j.core.util.CronExpression;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import srt.cloud.framework.dbswitch.common.util.StringUtil;
import java.util.*;
import java.util.stream.Collectors;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service.impl
* @Author: jpz
* @CreateTime: 2023/12/28 9:42
*/
@Service
@AllArgsConstructor
public class DataProductionScheduleServiceimpl extends BaseServiceImpl<DataProductionScheduleDao, DataProductionScheduleEntity> implements DataProductionScheduleService {
private final DataProductionScheduleNodeDao nodeDao;
private final DataProductionScheduleRecordDao recordDao;
private final DataProductionScheduleNodeRecordDao nodeRecordDao;
private final QuartzDataProductionScheduleApi scheduleApi;
@Override
public PageResult<DataProductionScheduleVo> page(DataProductionsScheduleQuery query) {
IPage<DataProductionScheduleEntity> page=baseMapper.selectPage(getPage(query),getWrapper(query));
return new PageResult<>(DataProductionConvert.INSTANCE.convertList(page.getRecords()),page.getTotal());
}
@Override
public Flow get(Long id) {
DataProductionScheduleEntity entity = baseMapper.selectById(id);
Flow flow = new Flow();
flow.setId(entity.getId());
flow.setName(entity.getName());
flow.setCron(entity.getCron());
flow.setStatus(entity.getStatus());
flow.setReleaseUserId(entity.getReleaseStatus());
flow.setReleaseTime(entity.getReleaseTime());
flow.setIfCycle(entity.getIfCycle());
flow.setNote(entity.getNote());
flow.setEdges(JSONUtil.parseObject(entity.getEdges(),new TypeReference<List<FlowEdge>>(){
}));
ArrayList<FlowNode> nodes = new ArrayList<>(6);
flow.setNodes(nodes);
//获取结点
HashMap<String, Object> queryMap = new HashMap<>();
queryMap.put("task_schedule_id",id);
List<DataProductionScheduleNodeEntity> dbNodes=nodeDao.selectByMap(queryMap);
for (DataProductionScheduleNodeEntity dbNode : dbNodes) {
FlowNode flowNode=getFlowNode(dbNode);
nodes.add(flowNode);
}
return flow;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void save(Flow flow) {
insertOrUpdate(flow);
}
@Override
public String run(Integer id) {
return scheduleRun(id, ExecuteType.HAND);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void delete(List<Long> idList) {
removeByIds(idList);
for (Long id : idList) {
//同步删除节点
Map<String, Object> delMap = new HashMap<>();
delMap.put("task_schedule_id", id);
nodeDao.deleteByMap(delMap);
}
}
@Override
public void release(Integer id) {
scheduleApi.release(id.longValue());
//更新状态,发布人和发布时间
DataProductionScheduleEntity dbEntity = new DataProductionScheduleEntity();
dbEntity.setId(id.longValue());
dbEntity.setStatus(1);
dbEntity.setReleaseUserId(SecurityUser.getUserId().intValue());
dbEntity.setReleaseTime(new Date());
baseMapper.changeStutus(dbEntity);
}
private String scheduleRun(Integer id, ExecuteType executeType) {
DataProductionScheduleEntity scheduleEntity=baseMapper.selectById(id);
LambdaQueryWrapper<DataProductionScheduleNodeEntity> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(DataProductionScheduleNodeEntity::getTaskScheduleId,id)
.orderByAsc(DataProductionScheduleNodeEntity::getSort);
List<DataProductionScheduleNodeEntity> dbNodes=nodeDao.selectList(wrapper);
//新增调度日志
DataProductionScheduleRecordEntity scheduleRecordEntity = new DataProductionScheduleRecordEntity();
scheduleRecordEntity.setProjectId(scheduleRecordEntity.getProjectId());
scheduleRecordEntity.setName(scheduleEntity.getName());
scheduleRecordEntity.setTaskScheduleId(id);
scheduleRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode());
scheduleRecordEntity.setStartTime(new Date());
scheduleRecordEntity.setExecuteType(executeType.getValue());
recordDao.insert(scheduleRecordEntity);
ThreadUtil.threadPool.execute(()->runNode(scheduleEntity,executeType,dbNodes,scheduleRecordEntity));
return scheduleEntity.getId().toString();
}
private void runNode(DataProductionScheduleEntity scheduleEntity, ExecuteType executeType, List<DataProductionScheduleNodeEntity> dbNodes, DataProductionScheduleRecordEntity scheduleRecordEntity) {
String processId=scheduleRecordEntity.getId()+DataProductionScheduleRecordEntity.SCHEDULE_RECORD;
ProcessEntity flowProcess= ProcessContextHolder.registerFlowProcess(ProcessEntity.init(ProcessType.FLINKEXECUTE,processId));
flowProcess.info("Start run flow...");
boolean recordSuccess = true;
List<FlowNode> flowNodes = dbNodes.stream().map(this::getFlowNode).collect(Collectors.toList());
for (DataProductionScheduleNodeEntity dbNode : dbNodes) {
//返回给前台的节点
FlowNode flowNode = flowNodes.stream().filter(item -> item.getId().equals(dbNode.getNo())).findFirst().get();
flowNode.getProperties().setRunStatus(CommonRunStatus.SUCCESS.getCode());
flowNode.getProperties().setStyle(FlowNodeProperties.SUCCESS_STYLE);
flowProcess.info(String.format("Start run node %s-%s", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue()));
DataProductionScheduleNodeRecordEntity nodeRecordEntity = new DataProductionScheduleNodeRecordEntity();
nodeRecordEntity.setProjectId(scheduleRecordEntity.getProjectId());
nodeRecordEntity.setTaskScheduleId(scheduleEntity.getId().intValue());
nodeRecordEntity.setScheduleNodeId(dbNode.getId().intValue());
nodeRecordEntity.setScheduleNodeNo(dbNode.getNo());
nodeRecordEntity.setScheduleRecordId(scheduleRecordEntity.getId().intValue());
nodeRecordEntity.setTaskId(dbNode.getTaskId());
nodeRecordEntity.setRunStatus(CommonRunStatus.RUNNING.getCode());
nodeRecordEntity.setStartTime(new Date());
nodeRecordEntity.setExecuteType(executeType.getValue());
nodeRecordDao.insert(nodeRecordEntity);
flowNode.getProperties().setNodeRecordId(nodeRecordEntity.getId().intValue());
JobResult jobResult = null;
if (jobResult != null) {
flowProcess.info(jobResult.getLog());
}
flowProcess.info(String.format("Node %s-%s run end", dbNode.getName(), Dialect.getByCode(dbNode.getTaskType().toString()).getValue()));
}
//更新调度记录
scheduleRecordEntity.setEndTime(new Date());
scheduleRecordEntity.setRunStatus(recordSuccess ? CommonRunStatus.SUCCESS.getCode() : CommonRunStatus.FAILED.getCode());
scheduleRecordEntity.setLog(ConsolePool.getInstance().get(processId).toString());
//保存历史json
Flow flow = new Flow();
flow.setId(scheduleEntity.getId());
flow.setIfCycle(scheduleEntity.getIfCycle());
flow.setRecordId(scheduleRecordEntity.getId().intValue());
flow.setName(scheduleEntity.getName());
flow.setNote(scheduleEntity.getNote());
flow.setCron(scheduleEntity.getCron());
flow.setNodes(flowNodes);
flow.setEdges(JSONUtil.parseObject(scheduleEntity.getEdges(), new TypeReference<List<FlowEdge>>() {
}));
scheduleRecordEntity.setConfigJson(JSONUtil.toJsonString(flow));
recordDao.updateById(scheduleRecordEntity);
flowProcess.infoEnd();
//移除日志
ProcessContextHolder.clearFlow();
ConsolePool.getInstance().remove(processId);
}
private void insertOrUpdate(Flow flow) {
if (flow.getIfCycle()==1&&!CronExpression.isValidExpression(flow.getCron())){
throw new ServerException("cron表达式有误,请检查后重新填写");
}
DataProductionScheduleEntity entity=DataProductionScheduleEntity.builder().id(flow.getId())
.projectId(getProjectId()).ifCycle(flow.getIfCycle())
.name(flow.getName()).cron(flow.getCron()).note(flow.getNote()).status(flow.getStatus()).releaseTime(flow.getReleaseTime()).releaseUserId(flow.getReleaseUserId()).build();
List<FlowNode> nodes=flow.getNodes();
List<FlowEdge> edges=flow.getEdges();
//寻找入度为0节点
List<FlowNode> starNodes=getStarNodes(nodes,edges);
//检查闭环
checkClosedLoop(starNodes,nodes,edges);
Set<FlowNode> runNodeSet=new LinkedHashSet<>();
buildRunNodes(runNodeSet,starNodes,nodes,edges);
if (entity.getId()==null){
entity.setEdges(JSONUtil.toJsonString(edges));
baseMapper.insert(entity);
//转换前端传过来节点为entity
List<DataProductionScheduleNodeEntity> clientNodes=getNodesByNodeSet(entity,runNodeSet);
//新增节点
clientNodes.forEach(nodeDao::insert);
}else {
List<DataProductionScheduleNodeEntity> clientNodes=getNodesByNodeSet(entity,runNodeSet);
entity.setEdges(JSONUtil.toJsonString(edges));
baseMapper.updateById(entity);
//获取库中节点
HashMap<String, Object> queryMap = new HashMap<>();
queryMap.put("task-schedule_id",entity.getId());
List<DataProductionScheduleNodeEntity> dbNode=nodeDao.selectByMap(queryMap);
//查询clientNodes的properties的id
List<DataProductionScheduleNodeEntity> insertNodes=clientNodes.stream().filter(item->item.getId()==null).collect(Collectors.toList());
insertNodes.forEach(nodeDao::insert);
//查询dbNode的properties的id不为空
clientNodes=getNodesByNodeSet(entity,runNodeSet);
List<DataProductionScheduleNodeEntity> updateNodes=clientNodes.stream().filter(item->item.getId()!=null).collect(Collectors.toList());
updateNodes.forEach(nodeDao::updateById);
//查询库里有,nodeset里边有没有,则是要进行删除
for (DataProductionScheduleNodeEntity dbNodes : dbNode) {
if(clientNodes.stream().noneMatch(item->dbNodes.getNo().equals(item.getNo()))){
nodeDao.deleteById(dbNodes.getId());
}
}
}
}
private List<DataProductionScheduleNodeEntity> getNodesByNodeSet(DataProductionScheduleEntity entity, Set<FlowNode> runNodeSet) {
List<DataProductionScheduleNodeEntity> clientNodes=new ArrayList<>(10);
int i=0;
for (FlowNode flowNode : runNodeSet) {
i++;
DataProductionScheduleNodeEntity nodeentity = new DataProductionScheduleNodeEntity();
nodeentity.setId(flowNode.getProperties().getId());
nodeentity.setTaskScheduleId(entity.getProjectId());
nodeentity.setProjectId(entity.getProjectId());
nodeentity.setNo(flowNode.getId());
nodeentity.setSort(i);
nodeentity.setName(flowNode.getProperties().getName());
nodeentity.setType(flowNode.getType());
nodeentity.setX(flowNode.getX());
nodeentity.setY(flowNode.getY());
nodeentity.setNote(flowNode.getProperties().getNote());
nodeentity.setTaskId(flowNode.getProperties().getTaskId());
nodeentity.setTaskType(flowNode.getProperties().getTaskType());
nodeentity.setFailGoOn(flowNode.getProperties().getFailGoOn());
nodeentity.setWeight(flowNode.getProperties().getWeight());
clientNodes.add(nodeentity);
}
return clientNodes;
}
private void buildRunNodes(Set<FlowNode> runNodeSet, List<FlowNode> starNodes, List<FlowNode> nodes, List<FlowEdge> edges) {
if (starNodes.isEmpty()){
return;
}
//按照权重逆序,权重越高越在前面
starNodes.sort((item1, item2)->item2.getProperties().getWeight().compareTo(item1.getProperties().getWeight()));
for (FlowNode starNode : starNodes) {
if (nodes.contains(starNode)){
runNodeSet.remove(starNode);
}
runNodeSet.add(starNode);
}
//获取子节点
ArrayList<FlowNode> childNodes = new ArrayList<>(2);
for (FlowNode starNode : starNodes) {
//获取以node为父节点
List<FlowEdge> collect=edges.stream().filter(item->starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList());
for (FlowEdge flowEdge : collect) {
FlowNode flowNode=nodes.stream().filter(item->item.getId().equals(flowEdge.getTargetNodeId())).findFirst().get();
childNodes.add(flowNode);
}
}
buildRunNodes(runNodeSet,childNodes,nodes,edges);
}
private void checkClosedLoop(List<FlowNode> starNodes, List<FlowNode> nodes, List<FlowEdge> edges) {
if (starNodes.isEmpty()){
throw new ServerException("流程不允许存在闭环,请检查");
}
for (FlowNode starNode : starNodes) {
Set<FlowNode> nodeSet = new HashSet<>();
//遍历检查闭环
dfs(nodeSet,starNode,nodes,edges);
}
}
private void dfs(Set<FlowNode> nodeSet, FlowNode starNode, List<FlowNode> nodes, List<FlowEdge> edges) {
List<FlowEdge> collect=edges.stream().filter(item-> starNode.getId().equals(item.getSourceNodeId())).collect(Collectors.toList());
if (collect.isEmpty()){
return;
}
for (FlowEdge edge : collect) {
FlowNode targetNode=nodes.stream().filter(item->item.getId().equals(edge.getTargetNodeId())).findFirst().get();
if (nodeSet.contains(targetNode)){
throw new ServerException("流程不允许存在闭环,请检查");
}
nodeSet.add(targetNode);
dfs(nodeSet,targetNode,nodes,edges);
}
}
private List<FlowNode> getStarNodes(List<FlowNode> nodes, List<FlowEdge> edges) {
if (nodes.isEmpty()){
throw new ServerException("流程不能为空");
}
ArrayList<FlowNode> startNode = new ArrayList<>(1);
for (FlowNode node : nodes) {
if (edges.stream().noneMatch(item->node.getId().equals(item.getTargetNodeId()))){
startNode.add(node);
}
}
return startNode;
}
private FlowNode getFlowNode(DataProductionScheduleNodeEntity dbNode) {
FlowNode flowNode = new FlowNode();
flowNode.setId(dbNode.getNo());
flowNode.setType(dbNode.getType());
flowNode.setX(dbNode.getX());
flowNode.setY(dbNode.getY());
FlowNodeProperties properties = new FlowNodeProperties();
properties.setId(dbNode.getId());
properties.setName(dbNode.getName());
properties.setTaskId(dbNode.getTaskId());
properties.setWeight(dbNode.getWeight());
properties.setTaskType(dbNode.getTaskType());
properties.setTaskTypeVal(Dialect.getByCode(dbNode.getTaskType().toString()).getValue());
properties.setNote(dbNode.getNote());
properties.setFailGoOn(dbNode.getFailGoOn());
flowNode.setProperties(properties);
return flowNode;
}
private LambdaQueryWrapper<DataProductionScheduleEntity> getWrapper(DataProductionsScheduleQuery query) {
LambdaQueryWrapper<DataProductionScheduleEntity> wrapper = Wrappers.lambdaQuery();
wrapper.like(StringUtil.isNotBlank(query.getName()),DataProductionScheduleEntity::getName,query.getName())
.eq(query.getStatus()!=null,DataProductionScheduleEntity::getStatus,query.getStatus())
.orderByDesc(DataProductionScheduleEntity::getUpdateTime).orderByDesc(DataProductionScheduleEntity::getId);
dataScopeWithOrgId(wrapper);
return wrapper;
}
}

View File

@ -0,0 +1,69 @@
package net.srt.disposition.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import net.srt.framework.common.utils.DateUtils;
import java.io.Serializable;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.vo
* @Author: jpz
* @CreateTime: 2023/12/28 8:57
*/
@Data
@Schema(description = "数据生产-作业调度")
public class DataProductionScheduleVo implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "主键")
private Long id;
private Long projectId;
@Schema(description = "调度名称")
private String name;
@Schema(description = "是否周期执行")
private Integer isCycle;
@Schema(description = "cron表达式")
private String cron;
@Schema(description = "描述")
private String note;
@Schema(description = "节点关系json")
private String edges;
@Schema(description = "0-未发布 1-已发布")
private Integer status;
private Date releaseTime;
private Integer releaseUserId;
@Schema(description = "版本号")
private Integer version;
@Schema(description = "删除标识 0:正常 1:已删除")
private Integer deleted;
@Schema(description = "创建者")
private Long creator;
@Schema(description = "创建时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date createTime;
@Schema(description = "更新者")
private Long updater;
@Schema(description = "更新时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date updateTime;
}

View File

@ -6,7 +6,7 @@ spring:
servlet:
load-on-startup: 1
application:
name: srt-data-development
name: srt-cloud-data-development
profiles:
active: dev
cloud:
@ -14,9 +14,9 @@ spring:
discovery:
server-addr: 101.34.77.101:8848
# 命名空间默认public
namespace: c5d32e76-b83c-4254-8176-1c6a2cee8e3b
namespace: 7e1e997d-5fa4-4f84-9f48-3e0adf830a37
service: ${spring.application.name}
group: srt2.1
group: srt2.0
config:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
namespace: ${spring.cloud.nacos.discovery.namespace}