jpz最新4.0

dev
jpz 2023-12-29 10:54:16 +08:00
parent d11d2b3b72
commit 308e870dff
18 changed files with 1301 additions and 0 deletions

View File

@ -0,0 +1,288 @@
package net.srt.quartz.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.governance.DataMetadataCollectApi;
import net.srt.api.module.data.governance.constant.*;
import net.srt.api.module.data.governance.dto.DataGovernanceMetadataCollectRecordDto;
import net.srt.api.module.data.governance.dto.DataGovernanceMetadataDto;
import net.srt.api.module.data.governance.dto.DataGovernanceMetadataPropertyDto;
import net.srt.api.module.data.governance.dto.DataGovernanceMetadataCollectDto;
import net.srt.api.module.data.integrate.DataDatabaseApi;
import net.srt.api.module.data.integrate.DataProjectApi;
import net.srt.api.module.data.integrate.dto.DataDatabaseDto;
import net.srt.flink.common.utils.LogUtil;
import net.srt.framework.common.cache.bean.DataProjectCacheBean;
import net.srt.framework.common.utils.DateUtils;
import org.springframework.stereotype.Component;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import srt.cloud.framework.dbswitch.common.util.StringUtil;
import srt.cloud.framework.dbswitch.core.model.ColumnDescription;
import srt.cloud.framework.dbswitch.core.model.TableDescription;
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
import srt.cloud.framework.dbswitch.core.util.JdbcUrlUtils;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @ClassName DataGovernanceMetadataCollectTask
* @Author zrx
* @Date 2023/04/-2 13:12
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DataGovernanceMetadataCollectTask {
private final DataMetadataCollectApi dataMetadataCollectApi;
private final DataDatabaseApi dataDatabaseApi;
private final DataProjectApi dataProjectApi;
public void run(String metadataCollectId, Thread currentThread) {
log.info("metadata collect task start to run...");
Long collectTaskId = Long.parseLong(metadataCollectId);
DataGovernanceMetadataCollectDto metadataCollectDto = dataMetadataCollectApi.getById(collectTaskId).getData();
if (metadataCollectDto == null) {
log.error("metadata collect task not found...");
return;
}
Long projectId = metadataCollectDto.getProjectId();
//生成同步任务
DataGovernanceMetadataCollectRecordDto collectRecord = new DataGovernanceMetadataCollectRecordDto();
collectRecord.setProjectId(projectId);
collectRecord.setMetadataCollectId(collectTaskId);
collectRecord.setStatus(MetadataCollectRunStatus.RUNNING.getCode());
collectRecord.setStartTime(new Date());
collectRecord.setCreateTime(new Date());
collectRecord = dataMetadataCollectApi.addCollectRecord(collectRecord);
StringBuilder realTimeLog = new StringBuilder();
try {
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Start to collect database info...\r\n");
Integer dbType = metadataCollectDto.getDbType();
//如果是数据库
DataDatabaseDto databaseDto;
if (DbType.DATABASE.getValue().equals(dbType)) {
databaseDto = dataDatabaseApi.getById(metadataCollectDto.getDatabaseId()).getData();
if (databaseDto == null) {
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Database not exists or get database info failed,get metadata info failed\r\n");
collectRecord.setRealTimeLog(realTimeLog.toString());
collectRecord.setErrorLog(DateUtils.formatDateTime(new Date()) + " Database not exists or get database info failed,get metadata info failed\r\n");
collectRecord.setEndTime(new Date());
collectRecord.setStatus(MetadataCollectRunStatus.FAILED.getCode());
dataMetadataCollectApi.upCollectRecord(collectRecord);
return;
}
} else {
//获取中台库信息
databaseDto = new DataDatabaseDto();
DataProjectCacheBean project = dataProjectApi.getById(projectId).getData();
databaseDto.setId(-1L);
Map<String, String> jdbcParams = JdbcUrlUtils.findParamsByMySqlJdbcUrl(project.getDbUrl());
if (jdbcParams != null) {
databaseDto.setDatabaseIp(jdbcParams.get("addresss"));
databaseDto.setDatabasePort(jdbcParams.get("port"));
}
databaseDto.setName(project.getName() + "<中台库>");
databaseDto.setDatabaseName(project.getDbName());
databaseDto.setJdbcUrl(project.getDbUrl());
databaseDto.setUserName(project.getDbUsername());
databaseDto.setPassword(project.getDbPassword());
databaseDto.setDatabaseType(project.getDbType());
}
DataGovernanceMetadataDto rootMetadata = dataMetadataCollectApi.getMetadataById(metadataCollectDto.getMetadataId()).getData();
//获取入库策略
Integer strategy = metadataCollectDto.getStrategy();
//创建库元数据信息
DataGovernanceMetadataDto databaseMetadata = new DataGovernanceMetadataDto();
databaseMetadata.setProjectId(projectId);
databaseMetadata.setCollectTaskId(collectTaskId);
databaseMetadata.setParentId(metadataCollectDto.getMetadataId());
databaseMetadata.setName(databaseDto.getName());
databaseMetadata.setPath(rootMetadata.getPath() + "/" + databaseMetadata.getName());
databaseMetadata.setCode(databaseDto.getDatabaseName());
//设置为库类型
databaseMetadata.setMetamodelId(BuiltInMetamodel.SCHEMA.getId());
databaseMetadata.setIcon(BuiltInMetamodel.SCHEMA.getIcon());
databaseMetadata.setIfLeaf(0);
databaseMetadata.setDbType(dbType);
databaseMetadata.setDatasourceId(databaseDto.getId());
//判断是否有该库的元数据
DataGovernanceMetadataDto dbDatabaseMetadata = dataMetadataCollectApi.getByParentIdAndDatasourceId(databaseMetadata.getParentId(), databaseMetadata.getDatasourceId()).getData();
if (dbDatabaseMetadata != null) {
databaseMetadata.setId(dbDatabaseMetadata.getId());
}
databaseMetadata = dataMetadataCollectApi.addOrUpdateMetadata(databaseMetadata);
//添加属性
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_COMMENT, projectId, databaseMetadata.getId(), databaseDto.getName()));
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_TYPE, projectId, databaseMetadata.getId(), ProductTypeEnum.getByIndex(databaseDto.getDatabaseType()).name()));
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_IP, projectId, databaseMetadata.getId(), databaseDto.getDatabaseIp()));
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_PORT, projectId, databaseMetadata.getId(), databaseDto.getDatabasePort()));
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_DATABASE, projectId, databaseMetadata.getId(), databaseDto.getDatabaseName()));
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_USERNAME, projectId, databaseMetadata.getId(), databaseDto.getUserName()));
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_PASSWORD, projectId, databaseMetadata.getId(), databaseDto.getPassword()));
addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_JDBC_URL, projectId, databaseMetadata.getId(), databaseDto.getJdbcUrl()));
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Start to collect tables info...\r\n");
updateRealTimeLog(collectRecord, realTimeLog);
//查询表元数据信息
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(databaseDto.getDatabaseType());
IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
List<TableDescription> tables = metaDataService.queryTableList(databaseDto.getJdbcUrl(), databaseDto.getUserName(), databaseDto.getPassword(),
ProductTypeEnum.ORACLE.equals(productTypeEnum) ? databaseDto.getUserName() : databaseDto.getDatabaseName());
if (currentThread.isInterrupted()) {
interrupt(collectRecord, realTimeLog);
return;
}
List<DataGovernanceMetadataDto> dbTables = dataMetadataCollectApi.listParentIdAndDatasourceId(databaseMetadata.getId(), databaseDto.getId(), BuiltInMetamodel.TABLE.getId()).getData();
//如果是全量,查询数据库中已有的数据
if (MetadataStrategyType.ALL.getValue().equals(strategy)) {
//如果库里有tables中没有删除
for (DataGovernanceMetadataDto dbTable : dbTables) {
if (tables.stream().noneMatch(item -> item.getTableName().equals(dbTable.getCode()))) {
dataMetadataCollectApi.deleteMetadata(dbTable.getId());
}
}
}
//采集表的元数据
for (TableDescription table : tables) {
if (currentThread.isInterrupted()) {
interrupt(collectRecord, realTimeLog);
return;
}
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append(String.format("Start to collect table [%s] info...、\r\n", table.getTableName()));
updateRealTimeLog(collectRecord, realTimeLog);
//创建表元数据信息
DataGovernanceMetadataDto tableMetadata = new DataGovernanceMetadataDto();
tableMetadata.setProjectId(projectId);
tableMetadata.setCollectTaskId(collectTaskId);
tableMetadata.setParentId(databaseMetadata.getId());
tableMetadata.setName(StringUtil.isNotBlank(table.getRemarks()) ? table.getRemarks() : table.getTableName());
tableMetadata.setPath(databaseMetadata.getPath() + "/" + tableMetadata.getName());
tableMetadata.setCode(table.getTableName());
//设置为表类型
tableMetadata.setMetamodelId(BuiltInMetamodel.TABLE.getId());
tableMetadata.setIcon(BuiltInMetamodel.TABLE.getIcon());
tableMetadata.setIfLeaf(0);
tableMetadata.setDbType(dbType);
tableMetadata.setDatasourceId(databaseDto.getId());
//判断是否有该库的元数据
DataGovernanceMetadataDto finalTableMetadata = tableMetadata;
DataGovernanceMetadataDto dbTableMetadata = dbTables.stream().filter(item -> item.getCode().equals(finalTableMetadata.getCode())).findFirst().orElse(null);
if (dbTableMetadata != null) {
tableMetadata.setId(dbTableMetadata.getId());
}
tableMetadata = dataMetadataCollectApi.addOrUpdateMetadata(tableMetadata);
//添加属性
addOrUpdateProperty(tableMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.TABLE_COMMENT, projectId, tableMetadata.getId(), table.getRemarks()));
addOrUpdateProperty(tableMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.TABLE_SPACE, projectId, tableMetadata.getId(), null));
//获取表的字段
List<ColumnDescription> columns = metaDataService.queryTableColumnMeta(databaseDto.getJdbcUrl(), databaseDto.getUserName(), databaseDto.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? databaseDto.getUserName() : databaseDto.getDatabaseName(), table.getTableName());
List<String> pks = metaDataService.queryTablePrimaryKeys(databaseDto.getJdbcUrl(), databaseDto.getUserName(), databaseDto.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? databaseDto.getUserName() : databaseDto.getDatabaseName(), table.getTableName());
List<DataGovernanceMetadataDto> dbColumns = dataMetadataCollectApi.listParentIdAndDatasourceId(tableMetadata.getId(), databaseDto.getId(), BuiltInMetamodel.COLUMN.getId()).getData();
//如果是全量
if (MetadataStrategyType.ALL.getValue().equals(strategy)) {
//如果库里有columns 中没有,删除
for (DataGovernanceMetadataDto dbColumn : dbColumns) {
if (columns.stream().noneMatch(item -> item.getFieldName().equals(dbColumn.getCode()))) {
dataMetadataCollectApi.deleteMetadata(dbColumn.getId());
}
}
}
for (ColumnDescription column : columns) {
if (pks.contains(column.getFieldName())) {
column.setPk(true);
}
DataGovernanceMetadataDto columnMetadata = new DataGovernanceMetadataDto();
columnMetadata.setProjectId(projectId);
columnMetadata.setCollectTaskId(collectTaskId);
columnMetadata.setParentId(tableMetadata.getId());
columnMetadata.setName(StringUtil.isNotBlank(column.getRemarks()) ? column.getRemarks() : column.getFieldName());
columnMetadata.setPath(tableMetadata.getPath() + "/" + columnMetadata.getName());
columnMetadata.setCode(column.getFieldName());
//设置为字段类型
columnMetadata.setMetamodelId(BuiltInMetamodel.COLUMN.getId());
columnMetadata.setIcon(BuiltInMetamodel.COLUMN.getIcon());
columnMetadata.setIfLeaf(0);
columnMetadata.setDbType(dbType);
columnMetadata.setDatasourceId(databaseDto.getId());
//判断是否有该元数据
DataGovernanceMetadataDto finalColumnMetadata = columnMetadata;
DataGovernanceMetadataDto dbColumnMetadata = dbColumns.stream().filter(item -> item.getCode().equals(finalColumnMetadata.getCode())).findFirst().orElse(null);
//DataGovernanceMetadataDto dbColumnMetadata = dataMetadataApi.getByParentIdAndOtherInfo(columnMetadata.getParentId(), columnMetadata.getDatasourceId(), columnMetadata.getCode(), columnMetadata.getMetamodelId()).getData();
if (dbColumnMetadata != null) {
columnMetadata.setId(dbColumnMetadata.getId());
}
columnMetadata = dataMetadataCollectApi.addOrUpdateMetadata(columnMetadata);
//添加属性
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_COMMENT, projectId, columnMetadata.getId(), column.getRemarks()));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_TYPE, projectId, columnMetadata.getId(), column.getFieldTypeName()));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_LENGTH, projectId, columnMetadata.getId(), String.valueOf(column.getDisplaySize())));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_PRECISION, projectId, columnMetadata.getId(), String.valueOf(column.getPrecisionSize())));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_SCALE, projectId, columnMetadata.getId(), String.valueOf(column.getScaleSize())));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_COL_KEY, projectId, columnMetadata.getId(), column.isPk() ? "是" : "否"));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_UNI_KEY, projectId, columnMetadata.getId(), column.isNonIndexUnique() ? "否" : "是"));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_NULLABLE, projectId, columnMetadata.getId(), column.isNullable() ? "是" : "否"));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_AUTO_INCREMENT, projectId, columnMetadata.getId(), column.isAutoIncrement() ? "是" : "否"));
addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_DEFAULT, projectId, columnMetadata.getId(), column.getDefaultValue()));
}
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append(String.format("Collect table [%s] info succeed\r\n", table.getTableName()));
updateRealTimeLog(collectRecord, realTimeLog);
}
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Collect database info succeed\r\n");
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("All metadata collect succeed\r\n");
collectRecord.setEndTime(new Date());
collectRecord.setStatus(MetadataCollectRunStatus.SUCCESS.getCode());
collectRecord.setRealTimeLog(realTimeLog.toString());
dataMetadataCollectApi.upCollectRecord(collectRecord);
log.info("metadata collect task success end");
} catch (Exception e) {
realTimeLog.append(LogUtil.getError(e));
collectRecord.setStatus(MetadataCollectRunStatus.FAILED.getCode());
collectRecord.setRealTimeLog(realTimeLog.toString());
collectRecord.setErrorLog(LogUtil.getError(e));
collectRecord.setEndTime(new Date());
dataMetadataCollectApi.upCollectRecord(collectRecord);
log.info("metadata collect task failed end");
}
}
private void interrupt(DataGovernanceMetadataCollectRecordDto collectRecord, StringBuilder realTimeLog) {
collectRecord.setStatus(MetadataCollectRunStatus.FAILED.getCode());
realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("The collect task has been canceled when is running\r\n");
collectRecord.setRealTimeLog(realTimeLog.toString());
collectRecord.setErrorLog(DateUtils.formatDateTime(new Date()) + " The collect task has been canceled when is running");
collectRecord.setEndTime(new Date());
dataMetadataCollectApi.upCollectRecord(collectRecord);
}
/**
*
*/
private void addOrUpdateProperty(DataGovernanceMetadataDto metadataDto, DataGovernanceMetadataPropertyDto metadataPropertyDto) {
DataGovernanceMetadataPropertyDto dbProperty = dataMetadataCollectApi.getByPropertyIdAndMetadataId(metadataPropertyDto.getMetamodelPropertyId(), metadataDto.getId()).getData();
if (dbProperty != null) {
metadataPropertyDto.setId(dbProperty.getId());
}
dataMetadataCollectApi.addOrUpdateMetadataProperty(metadataPropertyDto);
}
/**
*
*
* @param collectRecord
* @param realTimeLog
*/
private void updateRealTimeLog(DataGovernanceMetadataCollectRecordDto collectRecord, StringBuilder realTimeLog) {
collectRecord.setRealTimeLog(realTimeLog.toString());
dataMetadataCollectApi.upCollectRecord(collectRecord);
}
}

View File

@ -0,0 +1,33 @@
package net.srt.quartz.task;
import cn.hutool.core.thread.ThreadUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.development.DataProductionScheduleApi;
import org.springframework.stereotype.Component;
/**
* @ClassName DataProductionScheduleTask
* @Author zrx
* @Date 2023/1/19 14:39
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DataProductionScheduleTask {
private final DataProductionScheduleApi dataProductionScheduleApi;
public void run(String id, Thread currentThread) {
log.info("start run data-production-schedule");
String recordId = dataProductionScheduleApi.scheduleRun(Long.parseLong(id)).getData();
//查询调度状态
do {
ThreadUtil.sleep(5000);
/*if (currentThread.isInterrupted()) {
return;
}*/
} while (!dataProductionScheduleApi.scheduleComplete(Integer.parseInt(recordId)).getData());
log.info("run data-production-schedule success");
}
}

View File

@ -0,0 +1,128 @@
package net.srt.quartz.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.governance.DataMetadataApi;
import net.srt.api.module.data.governance.DataQualityApi;
import net.srt.api.module.data.governance.constant.DbType;
import net.srt.api.module.data.governance.dto.DataGovernanceMetadataDto;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityConfigDto;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskDto;
import net.srt.api.module.data.governance.dto.quality.QualityCheck;
import net.srt.api.module.data.governance.dto.quality.QulaityColumn;
import net.srt.api.module.data.integrate.DataDatabaseApi;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import net.srt.api.module.data.integrate.dto.DataDatabaseDto;
import net.srt.flink.common.utils.LogUtil;
import net.srt.framework.common.cache.bean.DataProjectCacheBean;
import net.srt.framework.common.utils.DateUtils;
import net.srt.framework.security.cache.TokenStoreCache;
import net.srt.quartz.task.quality.AbstractQualityAdapter;
import net.srt.quartz.task.quality.QualityFactory;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @ClassName DataProductionScheduleTask
* @Author zrx
* @Date 2023/1/19 14:39
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DataQualityTask {
private final DataQualityApi dataQualityApi;
private final DataMetadataApi dataMetadataApi;
private final DataDatabaseApi dataDatabaseApi;
private final TokenStoreCache tokenStoreCache;
public void run(String id, Thread currentThread) {
log.info("DataQualityTask run start");
Long configId = Long.parseLong(id);
DataGovernanceQualityConfigDto configDto = dataQualityApi.getById(configId).getData();
Long projectId = configDto.getProjectId();
List<Integer> metadataIds = configDto.getMetadataIds();
Integer ruleId = configDto.getRuleId();
List<QualityCheck> qualityChecks = new ArrayList<>();
//添加质量检测任务
DataGovernanceQualityTaskDto qualityTaskDto = new DataGovernanceQualityTaskDto();
try {
qualityTaskDto.setQualityConfigId(configId);
qualityTaskDto.setName(configDto.getName() + " - " + DateUtils.formatDateTime(new Date()));
qualityTaskDto.setStatus(CommonRunStatus.RUNNING.getCode());
qualityTaskDto.setStartTime(new Date());
qualityTaskDto.setProjectId(configDto.getProjectId());
qualityTaskDto = dataQualityApi.addQualityTask(qualityTaskDto).getData();
for (Integer metadataId : metadataIds) {
DataGovernanceMetadataDto columnMeta = dataMetadataApi.getById(metadataId).getData();
DataGovernanceMetadataDto tableMeta = dataMetadataApi.getById(columnMeta.getParentId().intValue()).getData();
QualityCheck qualityCheck = new QualityCheck();
qualityCheck.setRuleId(ruleId);
qualityCheck.setParam(configDto.getParam());
qualityCheck.setTableMetadataId(tableMeta.getId());
qualityCheck.setTableName(tableMeta.getCode());
QulaityColumn qulaityColumn = new QulaityColumn(columnMeta.getId().intValue(), columnMeta.getCode());
if (DbType.MIDDLE_DB.getValue().equals(tableMeta.getDbType())) {
DataProjectCacheBean project = tokenStoreCache.getProject(projectId);
qualityCheck.setDatabaseName(project.getDbName());
qualityCheck.setJdbcUrl(project.getDbUrl());
qualityCheck.setUserName(project.getDbUsername());
qualityCheck.setPassword(project.getDbPassword());
qualityCheck.setDatabaseType(project.getDbType());
} else {
DataDatabaseDto database = dataDatabaseApi.getById(tableMeta.getDatasourceId()).getData();
qualityCheck.setDatabaseName(database.getDatabaseName());
qualityCheck.setJdbcUrl(database.getJdbcUrl());
qualityCheck.setUserName(database.getUserName());
qualityCheck.setPassword(database.getPassword());
qualityCheck.setDatabaseType(database.getDatabaseType());
}
if (qualityChecks.contains(qualityCheck)) {
qualityCheck = qualityChecks.get(qualityChecks.indexOf(qualityCheck));
qualityCheck.getQulaityColumns().add(qulaityColumn);
} else {
qualityChecks.add(qualityCheck);
qualityCheck.getQulaityColumns().add(qulaityColumn);
}
}
//每张表依次质量检测
for (QualityCheck qualityCheck : qualityChecks) {
if (currentThread.isInterrupted()) {
log.error("DataQualityTask run interrupted");
qualityTaskDto.setEndTime(new Date());
qualityTaskDto.setStatus(CommonRunStatus.FAILED.getCode());
qualityTaskDto.setErrorLog("DataQualityTask run interrupted");
dataQualityApi.updateQualityTask(qualityTaskDto);
return;
}
//获取质量检测的适配器
AbstractQualityAdapter qualityAdapter = QualityFactory.createQualityAdapter(qualityCheck);
qualityAdapter.prepare(dataQualityApi, qualityTaskDto);
qualityAdapter.check();
}
qualityTaskDto.setEndTime(new Date());
qualityTaskDto.setStatus(CommonRunStatus.SUCCESS.getCode());
dataQualityApi.updateQualityTask(qualityTaskDto);
log.info("DataQualityTask run end");
} catch (Exception e) {
log.error("DataQualityTask run error", e);
qualityTaskDto.setEndTime(new Date());
qualityTaskDto.setStatus(CommonRunStatus.FAILED.getCode());
qualityTaskDto.setErrorLog(LogUtil.getError(e));
dataQualityApi.updateQualityTask(qualityTaskDto);
}
}
}

View File

@ -0,0 +1,150 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package net.srt.quartz.task.quality;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.governance.DataQualityApi;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskDto;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto;
import net.srt.api.module.data.governance.dto.quality.QualityCheck;
import net.srt.api.module.data.governance.dto.quality.QulaityColumn;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import srt.cloud.framework.dbswitch.common.util.DbswitchStrUtils;
import srt.cloud.framework.dbswitch.common.util.SingletonObject;
import srt.cloud.framework.dbswitch.common.util.TypeConvertUtils;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
*
*
* @author jrl
*/
@Slf4j
public abstract class AbstractQualityAdapter {
protected QualityCheck qualityCheck;
protected DataQualityApi dataQualityApi;
protected DataGovernanceQualityTaskDto qualityTaskDto;
protected String regx;
public AbstractQualityAdapter(QualityCheck qualityCheck) {
this.qualityCheck = qualityCheck;
}
public void prepare(DataQualityApi dataQualityApi, DataGovernanceQualityTaskDto qualityTaskDto) {
this.dataQualityApi = dataQualityApi;
this.qualityTaskDto = qualityTaskDto;
}
public void setRegx(String regx) {
this.regx = regx;
}
public static HikariDataSource createDataSource(QualityCheck qualityCheck) {
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType());
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(qualityCheck.getJdbcUrl());
ds.setDriverClassName(productTypeEnum.getDriveClassName());
ds.setUsername(qualityCheck.getUserName());
ds.setPassword(qualityCheck.getPassword());
if (ds.getDriverClassName().contains("oracle")) {
ds.setConnectionTestQuery("SELECT 'Hello' from DUAL");
// https://blog.csdn.net/qq_20960159/article/details/78593936
System.getProperties().setProperty("oracle.jdbc.J2EE13Compliant", "true");
} else if (ds.getDriverClassName().contains("db2")) {
ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1");
} else {
ds.setConnectionTestQuery("SELECT 1");
}
ds.setMaximumPoolSize(8);
ds.setMinimumIdle(5);
ds.setMaxLifetime(TimeUnit.MINUTES.toMillis(60));
ds.setConnectionTimeout(TimeUnit.MINUTES.toMillis(60));
ds.setIdleTimeout(60000);
return ds;
}
protected DataGovernanceQualityTaskTableDto addTaskTable(List<QulaityColumn> columns) {
DataGovernanceQualityTaskTableDto qualityTaskTableDto = new DataGovernanceQualityTaskTableDto();
qualityTaskTableDto.setQualityTaskId(qualityTaskDto.getId());
qualityTaskTableDto.setTableMetadataId(qualityCheck.getTableMetadataId());
qualityTaskTableDto.setTableName(qualityCheck.getTableName());
qualityTaskTableDto.setStatus(CommonRunStatus.RUNNING.getCode());
qualityTaskTableDto.setColumnInfo(columns);
qualityTaskTableDto.setProjectId(qualityTaskDto.getProjectId());
qualityTaskTableDto.setCheckTime(new Date());
qualityTaskTableDto.setStartTime(new Date());
qualityTaskTableDto = dataQualityApi.addTaskTable(qualityTaskTableDto).getData();
return qualityTaskTableDto;
}
protected Map<String, Object> buildRowMap(List<String> columns, ResultSet rs) throws SQLException {
Map<String, Object> map = new HashMap<>();
//转换
for (int i = 1; i <= columns.size(); i++) {
Object value = rs.getObject(i);
String key = columns.get(i - 1);
if (value instanceof byte[]) {
map.put(key, DbswitchStrUtils.toHexString((byte[]) value));
} else if (value instanceof java.sql.Clob) {
map.put(key, TypeConvertUtils.castToString(value));
} else if (value instanceof java.sql.Blob) {
map.put(key, DbswitchStrUtils.toHexString(TypeConvertUtils.castToByteArray(value)));
} else {
map.put(key, null == value ? null : value.toString());
}
}
return map;
}
protected DataGovernanceQualityTaskColumnDto buildTaskColumn(DataGovernanceQualityTaskTableDto taskTable, Map<String, Object> map, boolean pass, String notPassInfo) throws JsonProcessingException {
DataGovernanceQualityTaskColumnDto columnDto = new DataGovernanceQualityTaskColumnDto();
columnDto.setQualityTaskId(taskTable.getQualityTaskId());
columnDto.setQualityTaskTableId(taskTable.getId());
columnDto.setCheckRow(SingletonObject.OBJECT_MAPPER.writeValueAsString(map));
columnDto.setCheckTime(new Date());
columnDto.setProjectId(taskTable.getProjectId());
qualityTaskDto.setCheckCount(qualityTaskDto.getCheckCount() + 1);
taskTable.setCheckCount(taskTable.getCheckCount() + 1);
if (pass) {
qualityTaskDto.setPassCount(qualityTaskDto.getPassCount() + 1);
taskTable.setPassCount(taskTable.getPassCount() + 1);
columnDto.setCheckResult(1);
} else {
qualityTaskDto.setNotPassCount(qualityTaskDto.getNotPassCount() + 1);
columnDto.setNotPassInfo(notPassInfo);
taskTable.setNotPassCount(taskTable.getNotPassCount() + 1);
columnDto.setCheckResult(0);
}
return columnDto;
}
protected void updateTask(DataGovernanceQualityTaskTableDto taskTable, List<DataGovernanceQualityTaskColumnDto> columnDtos) {
dataQualityApi.updateQualityTask(qualityTaskDto);
dataQualityApi.updateQualityTaskTable(taskTable);
//添加字段检测记录
dataQualityApi.addQualityTaskColumns(columnDtos);
columnDtos.clear();
}
public abstract void check();
}

View File

@ -0,0 +1,79 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package net.srt.quartz.task.quality;
import net.srt.api.module.data.governance.constant.BuiltInQualityRule;
import net.srt.api.module.data.governance.dto.quality.QualityCheck;
import net.srt.quartz.task.quality.impl.DateFormatImpl;
import net.srt.quartz.task.quality.impl.RegxImpl;
import net.srt.quartz.task.quality.impl.UniquenessImpl;
import srt.cloud.framework.dbswitch.common.util.StringUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
/**
*
*
* @author jrl
*/
public class QualityFactory {
private static final Map<BuiltInQualityRule, Function<QualityCheck, AbstractQualityAdapter>> QUALITY_RULE_MAPPER
= new HashMap<BuiltInQualityRule, Function<QualityCheck, AbstractQualityAdapter>>() {
private static final long serialVersionUID = 3365136872693503697L;
{
put(BuiltInQualityRule.UNIQUENESS, UniquenessImpl::new);
put(BuiltInQualityRule.PHONE_NUMBER, RegxImpl::new);
put(BuiltInQualityRule.ID_CARD, RegxImpl::new);
put(BuiltInQualityRule.MAIL, RegxImpl::new);
put(BuiltInQualityRule.NUMBER_FORMAT, RegxImpl::new);
put(BuiltInQualityRule.LENGTH_CHECK, RegxImpl::new);
put(BuiltInQualityRule.NON_NULL_CHECK, RegxImpl::new);
put(BuiltInQualityRule.DATE_FORMAT, DateFormatImpl::new);
}
};
public static AbstractQualityAdapter createQualityAdapter(QualityCheck qualityCheck) {
Integer ruleId = qualityCheck.getRuleId();
BuiltInQualityRule rule = BuiltInQualityRule.getById(ruleId);
if (rule == null) {
throw new RuntimeException(
String.format("Unsupported ruleId (%s)", ruleId));
}
if (!QUALITY_RULE_MAPPER.containsKey(rule)) {
throw new RuntimeException(
String.format("Unsupported rule type (%s)", rule));
}
AbstractQualityAdapter qualityAdapter = QUALITY_RULE_MAPPER.get(rule).apply(qualityCheck);
if (qualityAdapter instanceof RegxImpl) {
if (BuiltInQualityRule.PHONE_NUMBER.equals(rule)) {
qualityAdapter.setRegx(StringUtil.REGEX_PHONE);
} else if (BuiltInQualityRule.ID_CARD.equals(rule)) {
qualityAdapter.setRegx(StringUtil.REGEX_IDCARD);
} else if (BuiltInQualityRule.MAIL.equals(rule)) {
qualityAdapter.setRegx(StringUtil.REGEX_EMAIL);
} else if (BuiltInQualityRule.NON_NULL_CHECK.equals(rule)) {
qualityAdapter.setRegx(StringUtil.NOT_BLANK);
} else if (BuiltInQualityRule.NUMBER_FORMAT.equals(rule)) {
qualityAdapter.setRegx(StringUtil.REGEX_NUMBER);
} else if (BuiltInQualityRule.LENGTH_CHECK.equals(rule)) {
qualityAdapter.setRegx(String.format(".{%s}", qualityCheck.getParam().getColumnLength()));
}
}
return qualityAdapter;
}
}

View File

@ -0,0 +1,108 @@
package net.srt.quartz.task.quality.impl;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto;
import net.srt.api.module.data.governance.dto.quality.QualityCheck;
import net.srt.api.module.data.governance.dto.quality.QulaityColumn;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import net.srt.flink.common.utils.LogUtil;
import net.srt.quartz.task.quality.AbstractQualityAdapter;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import srt.cloud.framework.dbswitch.common.util.StringUtil;
import srt.cloud.framework.dbswitch.dbcommon.database.DatabaseOperatorFactory;
import srt.cloud.framework.dbswitch.dbcommon.database.IDatabaseOperator;
import srt.cloud.framework.dbswitch.dbcommon.domain.StatementResultSet;
import java.sql.ResultSet;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* @ClassName UniquenessImpl
* @Author zrx
* @Date 2023/6/24 13:13
*/
@Slf4j
public class DateFormatImpl extends AbstractQualityAdapter {
private final static Integer BATCH_SIZE = 5000;
public DateFormatImpl(QualityCheck qualityCheck) {
super(qualityCheck);
}
@Override
public void check() {
log.info(String.format("DateFormatImpl start check,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName()));
//连接数据库
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType());
List<QulaityColumn> qulaityColumns = qualityCheck.getQulaityColumns();
addTaskColumns(productTypeEnum, qulaityColumns);
}
private void addTaskColumns(ProductTypeEnum productTypeEnum, List<QulaityColumn> columnInfos) {
DataGovernanceQualityTaskTableDto taskTable = addTaskTable(columnInfos);
try {
List<String> columns = columnInfos.stream().map(QulaityColumn::getColumnName).collect(Collectors.toList());
IDatabaseOperator sourceOperator = DatabaseOperatorFactory
.createDatabaseOperator(createDataSource(qualityCheck), productTypeEnum);
sourceOperator.setFetchSize(BATCH_SIZE);
try (StatementResultSet srs = sourceOperator.queryTableData(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns); ResultSet rs = srs.getResultset()) {
int size = 0;
List<DataGovernanceQualityTaskColumnDto> columnDtos = new ArrayList<>();
while (rs.next()) {
size++;
Map<String, Object> map = new HashMap<>();
//转换
for (int i = 1; i <= columns.size(); i++) {
Object value = rs.getObject(i);
String key = columns.get(i - 1);
map.put(key, value);
}
//逐个判断是否符合正则表达式
StringBuilder notPassInfo = new StringBuilder();
boolean pass = true;
for (Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (!(value instanceof java.sql.Time || value instanceof java.sql.Timestamp || value instanceof java.sql.Date || value instanceof LocalDateTime)) {
String vStr = String.valueOf(value);
if (!vStr.matches(StringUtil.DATE_FORAMT)) {
pass = false;
notPassInfo.append(String.format("【%s】字段不合规", key));
}
}
}
//构建taskColumn
DataGovernanceQualityTaskColumnDto columnDto = buildTaskColumn(taskTable, map, pass, notPassInfo.length() > 0 ? notPassInfo.deleteCharAt(notPassInfo.length() - 1).toString() : null);
columnDtos.add(columnDto);
//5000一次
if (size % BATCH_SIZE == 0) {
//更新任务和表任务的检测数
updateTask(taskTable, columnDtos);
}
}
// 检查剩下没更新的
if (!columnDtos.isEmpty()) {
updateTask(taskTable, columnDtos);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
taskTable.setEndTime(new Date());
taskTable.setStatus(CommonRunStatus.SUCCESS.getCode());
dataQualityApi.updateQualityTaskTable(taskTable);
} catch (Exception e) {
taskTable.setEndTime(new Date());
taskTable.setErrorLog(LogUtil.getError(e));
taskTable.setStatus(CommonRunStatus.FAILED.getCode());
dataQualityApi.updateQualityTaskTable(taskTable);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,106 @@
package net.srt.quartz.task.quality.impl;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto;
import net.srt.api.module.data.governance.dto.quality.QualityCheck;
import net.srt.api.module.data.governance.dto.quality.QulaityColumn;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import net.srt.flink.common.utils.LogUtil;
import net.srt.quartz.task.quality.AbstractQualityAdapter;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import srt.cloud.framework.dbswitch.dbcommon.database.DatabaseOperatorFactory;
import srt.cloud.framework.dbswitch.dbcommon.database.IDatabaseOperator;
import srt.cloud.framework.dbswitch.dbcommon.domain.StatementResultSet;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @ClassName UniquenessImpl
* @Author zrx
* @Date 2023/6/24 13:13
*/
@Slf4j
public class RegxImpl extends AbstractQualityAdapter {
private final static Integer BATCH_SIZE = 5000;
public RegxImpl(QualityCheck qualityCheck) {
super(qualityCheck);
}
@Override
public void check() {
log.info(String.format("RegxImpl start check,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName()));
//连接数据库
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType());
List<QulaityColumn> qulaityColumns = qualityCheck.getQulaityColumns();
addTaskColumns(productTypeEnum, qulaityColumns);
}
private void addTaskColumns(ProductTypeEnum productTypeEnum, List<QulaityColumn> columnInfos) {
DataGovernanceQualityTaskTableDto taskTable = addTaskTable(columnInfos);
try {
List<String> columns = columnInfos.stream().map(QulaityColumn::getColumnName).collect(Collectors.toList());
IDatabaseOperator sourceOperator = DatabaseOperatorFactory
.createDatabaseOperator(createDataSource(qualityCheck), productTypeEnum);
sourceOperator.setFetchSize(BATCH_SIZE);
try (StatementResultSet srs = sourceOperator.queryTableData(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns); ResultSet rs = srs.getResultset()) {
int size = 0;
List<DataGovernanceQualityTaskColumnDto> columnDtos = new ArrayList<>();
while (rs.next()) {
size++;
Map<String, Object> map = buildRowMap(columns, rs);
//逐个判断是否符合正则表达式
StringBuilder notPassInfo = new StringBuilder();
boolean pass = true;
for (Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value == null) {
pass = false;
notPassInfo.append(String.format("【%s】字段不合规", key));
} else {
String vStr = String.valueOf(value);
if (!vStr.matches(regx)) {
pass = false;
notPassInfo.append(String.format("【%s】字段不合规", key));
}
}
}
//构建taskColumn
DataGovernanceQualityTaskColumnDto columnDto = buildTaskColumn(taskTable, map, pass, notPassInfo.length() > 0 ? notPassInfo.deleteCharAt(notPassInfo.length() - 1).toString() : null);
columnDtos.add(columnDto);
//5000一次
if (size % BATCH_SIZE == 0) {
//更新任务和表任务的检测数
updateTask(taskTable, columnDtos);
}
}
// 检查剩下没更新的
if (!columnDtos.isEmpty()) {
updateTask(taskTable, columnDtos);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
taskTable.setEndTime(new Date());
taskTable.setStatus(CommonRunStatus.SUCCESS.getCode());
dataQualityApi.updateQualityTaskTable(taskTable);
} catch (Exception e) {
taskTable.setEndTime(new Date());
taskTable.setErrorLog(LogUtil.getError(e));
taskTable.setStatus(CommonRunStatus.FAILED.getCode());
dataQualityApi.updateQualityTaskTable(taskTable);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,112 @@
package net.srt.quartz.task.quality.impl;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto;
import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto;
import net.srt.api.module.data.governance.dto.quality.QualityCheck;
import net.srt.api.module.data.governance.dto.quality.QualityConfigParam;
import net.srt.api.module.data.governance.dto.quality.QulaityColumn;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import net.srt.flink.common.utils.LogUtil;
import net.srt.quartz.task.quality.AbstractQualityAdapter;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
import srt.cloud.framework.dbswitch.dbcommon.database.DatabaseOperatorFactory;
import srt.cloud.framework.dbswitch.dbcommon.database.IDatabaseOperator;
import srt.cloud.framework.dbswitch.dbcommon.domain.StatementResultSet;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @ClassName UniquenessImpl
* @Author zrx
* @Date 2023/6/24 13:13
*/
@Slf4j
public class UniquenessImpl extends AbstractQualityAdapter {
private final static Integer BATCH_SIZE = 5000;
public UniquenessImpl(QualityCheck qualityCheck) {
super(qualityCheck);
}
@Override
public void check() {
log.info(String.format("UniquenessImpl start check,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName()));
//连接数据库
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType());
List<QulaityColumn> qulaityColumns = qualityCheck.getQulaityColumns();
//判断是组合式唯一还是单字段唯一
QualityConfigParam param = qualityCheck.getParam();
IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
if (param.getUniqueType() == 1) {
//单字段唯一,每个字段检测一次
for (QulaityColumn qulaityColumn : qulaityColumns) {
List<QulaityColumn> columnInfos = new ArrayList<>();
columnInfos.add(qulaityColumn);
addTaskColumns(productTypeEnum, metaDataService, columnInfos);
}
} else {
//组合字段唯一
addTaskColumns(productTypeEnum, metaDataService, qulaityColumns);
}
log.info(String.format("UniquenessImpl check end,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName()));
}
private void addTaskColumns(ProductTypeEnum productTypeEnum, IMetaDataByJdbcService metaDataService, List<QulaityColumn> columnInfos) {
DataGovernanceQualityTaskTableDto taskTable = addTaskTable(columnInfos);
try {
List<String> columns = columnInfos.stream().map(QulaityColumn::getColumnName).collect(Collectors.toList());
String countMoreThanOneSql = metaDataService.getCountMoreThanOneSql(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns);
String countOneSql = metaDataService.getCountOneSql(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns);
//挨个字段检测唯一
executeSql(productTypeEnum, countMoreThanOneSql, columns, taskTable, false);
executeSql(productTypeEnum, countOneSql, columns, taskTable, true);
taskTable.setEndTime(new Date());
taskTable.setStatus(CommonRunStatus.SUCCESS.getCode());
dataQualityApi.updateQualityTaskTable(taskTable);
} catch (Exception e) {
taskTable.setEndTime(new Date());
taskTable.setErrorLog(LogUtil.getError(e));
taskTable.setStatus(CommonRunStatus.FAILED.getCode());
dataQualityApi.updateQualityTaskTable(taskTable);
throw new RuntimeException(e);
}
}
private void executeSql(ProductTypeEnum productTypeEnum, String sql, List<String> columns, DataGovernanceQualityTaskTableDto taskTable, Boolean pass) {
IDatabaseOperator sourceOperator = DatabaseOperatorFactory
.createDatabaseOperator(createDataSource(qualityCheck), productTypeEnum);
sourceOperator.setFetchSize(BATCH_SIZE);
try (StatementResultSet srs = sourceOperator.queryTableData(sql); ResultSet rs = srs.getResultset()) {
int size = 0;
List<DataGovernanceQualityTaskColumnDto> columnDtos = new ArrayList<>();
while (rs.next()) {
size++;
Map<String, Object> map = buildRowMap(columns, rs);
DataGovernanceQualityTaskColumnDto columnDto = buildTaskColumn(taskTable, map, pass, String.join(",", columns) + "不唯一");
columnDtos.add(columnDto);
//5000一次
if (size % BATCH_SIZE == 0) {
updateTask(taskTable, columnDtos);
}
}
// 检查剩下没更新的
if (!columnDtos.isEmpty()) {
updateTask(taskTable, columnDtos);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,11 +1,15 @@
package net.srt.disposition.controller;
import cn.hutool.db.Page;
import cn.hutool.log.dialect.console.ConsoleLog;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.query.DataProductionScheduleRecordQuery;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.service.DataProductionScheduleService;
import net.srt.disposition.vo.DataProductionScheduleRecordVO;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.common.utils.Result;
@ -74,6 +78,30 @@ public class DataProductionScheduleController {
return Result.ok();
}
@GetMapping("record/page")
@Operation(summary = "调度日记记录分页")
@PreAuthorize("hasAuthority('data-development:schedule:record:page')")
public Result<PageResult<DataProductionScheduleRecordVO>> pageRecord(@Valid DataProductionScheduleRecordQuery query){
PageResult<DataProductionScheduleRecordVO> page=dataProductionScheduleService.pageRecord(query);
return Result.ok(page);
}
@DeleteMapping("/record")
@Operation(summary = "删除调度日志记录")
@PreAuthorize("hasAuthority('data-development:schedule:record:delete')")
public Result<String> delRecord(@RequestBody List<Long> idList) {
dataProductionScheduleService.delRecord(idList);
return Result.ok();
}
@GetMapping("/log/{recordId}")
@Operation(summary = "获取流程图执行的日志")
public Result<ConsoleLog> getLog(@PathVariable Integer recordId) {
return Result.ok(dataProductionScheduleService.getLog(recordId));
}
}

View File

@ -0,0 +1,22 @@
package net.srt.disposition.convert;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.disposition.vo.DataProductionScheduleRecordVO;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.convert
* @Author: jpz
* @CreateTime: 2023/12/29 9:08
*/
@Mapper
public interface DataProductionScheduleRecordConvert {
DataProductionScheduleRecordConvert INSTANCE = Mappers.getMapper(DataProductionScheduleRecordConvert.class);
List<DataProductionScheduleRecordVO> convertList(List<DataProductionScheduleRecordEntity> list);
}

View File

@ -0,0 +1,16 @@
package net.srt.disposition.dto;
import lombok.Data;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.dto
* @Author: jpz
* @CreateTime: 2023/12/29 9:50
*/
@Data
public class ConsoleLog {
private String log = "";
private boolean end = false;
}

View File

@ -0,0 +1,29 @@
package net.srt.disposition.query;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import net.srt.framework.common.query.Query;
import net.srt.framework.common.utils.DateUtils;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.query
* @Author: jpz
* @CreateTime: 2023/12/29 8:50
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Schema(description = "数据生产-作业调度记录查询")
public class DataProductionScheduleRecordQuery extends Query {
private String name;
private Integer runStatus;
private Integer executeType;
@DateTimeFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date startTime;
@DateTimeFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date endTime;
}

View File

@ -0,0 +1,22 @@
package net.srt.disposition.service;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.disposition.query.DataProductionScheduleRecordQuery;
import net.srt.disposition.vo.DataProductionScheduleRecordVO;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.BaseService;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service
* @Author: jpz
* @CreateTime: 2023/12/29 8:55
*/
public interface DataProductionScheduleRecordService extends BaseService<DataProductionScheduleRecordEntity> {
PageResult<DataProductionScheduleRecordVO> page(DataProductionScheduleRecordQuery query);
void delete(List<Long> idList);
}

View File

@ -1,8 +1,11 @@
package net.srt.disposition.service;
import cn.hutool.log.dialect.console.ConsoleLog;
import net.srt.disposition.dto.Flow;
import net.srt.disposition.entity.DataProductionScheduleEntity;
import net.srt.disposition.query.DataProductionScheduleRecordQuery;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.vo.DataProductionScheduleRecordVO;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.BaseService;
@ -29,4 +32,10 @@ public interface DataProductionScheduleService extends BaseService<DataProductio
void delete(List<Long> idList);
void release(Integer id);
PageResult<DataProductionScheduleRecordVO> pageRecord(DataProductionScheduleRecordQuery query);
void delRecord(List<Long> idList);
ConsoleLog getLog(Integer recordId);
}

View File

@ -0,0 +1,51 @@
package net.srt.disposition.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.AllArgsConstructor;
import net.srt.disposition.convert.DataProductionScheduleRecordConvert;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.disposition.mapper.DataProductionScheduleRecordDao;
import net.srt.disposition.query.DataProductionScheduleRecordQuery;
import net.srt.disposition.service.DataProductionScheduleRecordService;
import net.srt.disposition.vo.DataProductionScheduleRecordVO;
import net.srt.framework.common.page.PageResult;
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
import org.springframework.stereotype.Service;
import srt.cloud.framework.dbswitch.common.util.StringUtil;
import java.util.List;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.service.impl
* @Author: jpz
* @CreateTime: 2023/12/29 8:58
*/
@Service
@AllArgsConstructor
public class DataProductionScheduleRecordServiceimpl extends BaseServiceImpl<DataProductionScheduleRecordDao, DataProductionScheduleRecordEntity> implements DataProductionScheduleRecordService {
@Override
public PageResult<DataProductionScheduleRecordVO> page(DataProductionScheduleRecordQuery query) {
IPage<DataProductionScheduleRecordEntity> page = baseMapper.selectPage(getPage(query),getWrapper(query));
return new PageResult<>(DataProductionScheduleRecordConvert.INSTANCE.convertList(page.getRecords()), page.getTotal());
}
@Override
public void delete(List<Long> idList) {
removeByIds(idList);
}
private LambdaQueryWrapper<DataProductionScheduleRecordEntity> getWrapper(DataProductionScheduleRecordQuery query) {
LambdaQueryWrapper<DataProductionScheduleRecordEntity> wrapper = Wrappers.lambdaQuery();
wrapper.like(StringUtil.isNotBlank(query.getName()), DataProductionScheduleRecordEntity::getName, query.getName())
.eq(query.getRunStatus()!=null, DataProductionScheduleRecordEntity::getRunStatus, query.getRunStatus())
.eq(query.getExecuteType()!=null, DataProductionScheduleRecordEntity::getExecuteType, query.getExecuteType())
.eq(query.getStartTime()!=null, DataProductionScheduleRecordEntity::getStartTime, query.getStartTime())
.eq(query.getEndTime()!=null, DataProductionScheduleRecordEntity::getEndTime, query.getEndTime())
.orderByDesc(DataProductionScheduleRecordEntity::getCreateTime)
.orderByDesc(DataProductionScheduleRecordEntity::getId);
return wrapper;
}
}

View File

@ -1,5 +1,6 @@
package net.srt.disposition.service.impl;
import cn.hutool.log.dialect.console.ConsoleLog;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
@ -21,8 +22,11 @@ import net.srt.disposition.entity.DataProductionScheduleNodeEntity;
import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity;
import net.srt.disposition.entity.DataProductionScheduleRecordEntity;
import net.srt.disposition.mapper.DataProductionScheduleDao;
import net.srt.disposition.query.DataProductionScheduleRecordQuery;
import net.srt.disposition.query.DataProductionsScheduleQuery;
import net.srt.disposition.service.DataProductionScheduleRecordService;
import net.srt.disposition.service.DataProductionScheduleService;
import net.srt.disposition.vo.DataProductionScheduleRecordVO;
import net.srt.disposition.vo.DataProductionScheduleVo;
import net.srt.flink.common.config.Dialect;
import net.srt.flink.common.utils.JSONUtil;
@ -57,6 +61,7 @@ public class DataProductionScheduleServiceimpl extends BaseServiceImpl<DataProdu
private final DataProductionScheduleRecordDao recordDao;
private final DataProductionScheduleNodeRecordDao nodeRecordDao;
private final QuartzDataProductionScheduleApi scheduleApi;
private final DataProductionScheduleRecordService recordService;
@Override
public PageResult<DataProductionScheduleVo> page(DataProductionsScheduleQuery query) {
IPage<DataProductionScheduleEntity> page=baseMapper.selectPage(getPage(query),getWrapper(query));
@ -125,6 +130,21 @@ public class DataProductionScheduleServiceimpl extends BaseServiceImpl<DataProdu
baseMapper.changeStutus(dbEntity);
}
@Override
public PageResult<DataProductionScheduleRecordVO> pageRecord(DataProductionScheduleRecordQuery query) {
return recordService.page(query);
}
@Override
public void delRecord(List<Long> idList) {
recordService.delete(idList);
}
@Override
public ConsoleLog getLog(Integer recordId) {
return null;
}
private String scheduleRun(Integer id, ExecuteType executeType) {
DataProductionScheduleEntity scheduleEntity=baseMapper.selectById(id);

View File

@ -0,0 +1,70 @@
package net.srt.disposition.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import net.srt.framework.common.utils.DateUtils;
import java.io.Serializable;
import java.util.Date;
/**
* @BelongsProject: srt_cloud
* @BelongsPackage: net.srt.disposition.vo
* @Author: jpz
* @CreateTime: 2023/12/29 8:47
*/
@Data
@Schema(description = "数据生产-作业调度记录")
public class DataProductionScheduleRecordVO implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "主键id")
private Integer id;
private String name;
@Schema(description = "调度id")
private Integer taskScheduleId;
@Schema(description = "项目租户id")
private Long projectId;
@Schema(description = "当前状态 字典 run_status")
private Integer runStatus;
@Schema(description = "开始时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date startTime;
@Schema(description = "结束时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date endTime;
@Schema(description = "运行日志")
private String log;
private Integer executeType;
private String configJson;
@Schema(description = "版本号")
private Integer version;
@Schema(description = "删除标识 0正常 1已删除")
private Integer deleted;
@Schema(description = "创建者")
private Long creator;
@Schema(description = "创建时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date createTime;
@Schema(description = "更新者")
private Long updater;
@Schema(description = "更新时间")
@JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN)
private Date updateTime;
}

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.srt.disposition.mapper.DataProductionScheduleDao">
<resultMap type="net.srt.disposition.entity.DataProductionScheduleEntity" id="dataProductionScheduleMap">
<result property="id" column="id"/>
<result property="projectId" column="project_id"/>
<result property="name" column="name"/>
<result property="ifCycle" column="if_cycle"/>
<result property="cron" column="cron"/>
<result property="note" column="note"/>
<result property="status" column="status"/>
<result property="releaseTime" column="release_time"/>
<result property="releaseUserId" column="release_user_id"/>
<result property="edges" column="edges"/>
<result property="version" column="version"/>
<result property="deleted" column="deleted"/>
<result property="creator" column="creator"/>
<result property="createTime" column="create_time"/>
<result property="updater" column="updater"/>
<result property="updateTime" column="update_time"/>
</resultMap>
<update id="changeStutus" parameterType="net.srt.disposition.entity.DataProductionScheduleEntity">
UPDATE data_production_schedule SET status=#{status},release_time=#{releaseTime},release_user_id=#{releaseUserId}
WHERE id=#{id}
</update>
</mapper>