diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataGovernanceMetadataCollectTask.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataGovernanceMetadataCollectTask.java new file mode 100644 index 0000000..9861e7f --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataGovernanceMetadataCollectTask.java @@ -0,0 +1,288 @@ +package net.srt.quartz.task; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.srt.api.module.data.governance.DataMetadataCollectApi; +import net.srt.api.module.data.governance.constant.*; +import net.srt.api.module.data.governance.dto.DataGovernanceMetadataCollectRecordDto; +import net.srt.api.module.data.governance.dto.DataGovernanceMetadataDto; +import net.srt.api.module.data.governance.dto.DataGovernanceMetadataPropertyDto; +import net.srt.api.module.data.governance.dto.DataGovernanceMetadataCollectDto; +import net.srt.api.module.data.integrate.DataDatabaseApi; +import net.srt.api.module.data.integrate.DataProjectApi; +import net.srt.api.module.data.integrate.dto.DataDatabaseDto; +import net.srt.flink.common.utils.LogUtil; +import net.srt.framework.common.cache.bean.DataProjectCacheBean; +import net.srt.framework.common.utils.DateUtils; +import org.springframework.stereotype.Component; +import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum; +import srt.cloud.framework.dbswitch.common.util.StringUtil; +import srt.cloud.framework.dbswitch.core.model.ColumnDescription; +import srt.cloud.framework.dbswitch.core.model.TableDescription; +import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService; +import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl; +import srt.cloud.framework.dbswitch.core.util.JdbcUrlUtils; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * @ClassName DataGovernanceMetadataCollectTask + * @Author zrx + * @Date 2023/04/-2 13:12 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DataGovernanceMetadataCollectTask { + + private final DataMetadataCollectApi dataMetadataCollectApi; + private final DataDatabaseApi dataDatabaseApi; + private final DataProjectApi dataProjectApi; + + public void run(String metadataCollectId, Thread currentThread) { + log.info("metadata collect task start to run..."); + Long collectTaskId = Long.parseLong(metadataCollectId); + DataGovernanceMetadataCollectDto metadataCollectDto = dataMetadataCollectApi.getById(collectTaskId).getData(); + if (metadataCollectDto == null) { + log.error("metadata collect task not found..."); + return; + } + Long projectId = metadataCollectDto.getProjectId(); + //生成同步任务 + DataGovernanceMetadataCollectRecordDto collectRecord = new DataGovernanceMetadataCollectRecordDto(); + collectRecord.setProjectId(projectId); + collectRecord.setMetadataCollectId(collectTaskId); + collectRecord.setStatus(MetadataCollectRunStatus.RUNNING.getCode()); + collectRecord.setStartTime(new Date()); + collectRecord.setCreateTime(new Date()); + collectRecord = dataMetadataCollectApi.addCollectRecord(collectRecord); + StringBuilder realTimeLog = new StringBuilder(); + try { + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Start to collect database info...\r\n"); + Integer dbType = metadataCollectDto.getDbType(); + //如果是数据库 + DataDatabaseDto databaseDto; + if (DbType.DATABASE.getValue().equals(dbType)) { + databaseDto = dataDatabaseApi.getById(metadataCollectDto.getDatabaseId()).getData(); + if (databaseDto == null) { + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Database not exists or get database info failed,get metadata info failed\r\n"); + collectRecord.setRealTimeLog(realTimeLog.toString()); + collectRecord.setErrorLog(DateUtils.formatDateTime(new Date()) + " Database not exists or get database info failed,get metadata info failed\r\n"); + collectRecord.setEndTime(new Date()); + collectRecord.setStatus(MetadataCollectRunStatus.FAILED.getCode()); + dataMetadataCollectApi.upCollectRecord(collectRecord); + return; + } + } else { + //获取中台库信息 + databaseDto = new DataDatabaseDto(); + DataProjectCacheBean project = dataProjectApi.getById(projectId).getData(); + databaseDto.setId(-1L); + Map jdbcParams = JdbcUrlUtils.findParamsByMySqlJdbcUrl(project.getDbUrl()); + if (jdbcParams != null) { + databaseDto.setDatabaseIp(jdbcParams.get("addresss")); + databaseDto.setDatabasePort(jdbcParams.get("port")); + } + databaseDto.setName(project.getName() + "<中台库>"); + databaseDto.setDatabaseName(project.getDbName()); + databaseDto.setJdbcUrl(project.getDbUrl()); + databaseDto.setUserName(project.getDbUsername()); + databaseDto.setPassword(project.getDbPassword()); + databaseDto.setDatabaseType(project.getDbType()); + } + DataGovernanceMetadataDto rootMetadata = dataMetadataCollectApi.getMetadataById(metadataCollectDto.getMetadataId()).getData(); + //获取入库策略 + Integer strategy = metadataCollectDto.getStrategy(); + //创建库元数据信息 + DataGovernanceMetadataDto databaseMetadata = new DataGovernanceMetadataDto(); + databaseMetadata.setProjectId(projectId); + databaseMetadata.setCollectTaskId(collectTaskId); + databaseMetadata.setParentId(metadataCollectDto.getMetadataId()); + databaseMetadata.setName(databaseDto.getName()); + databaseMetadata.setPath(rootMetadata.getPath() + "/" + databaseMetadata.getName()); + databaseMetadata.setCode(databaseDto.getDatabaseName()); + //设置为库类型 + databaseMetadata.setMetamodelId(BuiltInMetamodel.SCHEMA.getId()); + databaseMetadata.setIcon(BuiltInMetamodel.SCHEMA.getIcon()); + databaseMetadata.setIfLeaf(0); + databaseMetadata.setDbType(dbType); + databaseMetadata.setDatasourceId(databaseDto.getId()); + //判断是否有该库的元数据 + DataGovernanceMetadataDto dbDatabaseMetadata = dataMetadataCollectApi.getByParentIdAndDatasourceId(databaseMetadata.getParentId(), databaseMetadata.getDatasourceId()).getData(); + if (dbDatabaseMetadata != null) { + databaseMetadata.setId(dbDatabaseMetadata.getId()); + } + databaseMetadata = dataMetadataCollectApi.addOrUpdateMetadata(databaseMetadata); + //添加属性 + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_COMMENT, projectId, databaseMetadata.getId(), databaseDto.getName())); + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_TYPE, projectId, databaseMetadata.getId(), ProductTypeEnum.getByIndex(databaseDto.getDatabaseType()).name())); + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_IP, projectId, databaseMetadata.getId(), databaseDto.getDatabaseIp())); + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_PORT, projectId, databaseMetadata.getId(), databaseDto.getDatabasePort())); + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_DATABASE, projectId, databaseMetadata.getId(), databaseDto.getDatabaseName())); + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_USERNAME, projectId, databaseMetadata.getId(), databaseDto.getUserName())); + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_PASSWORD, projectId, databaseMetadata.getId(), databaseDto.getPassword())); + addOrUpdateProperty(databaseMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.SCHEMA_JDBC_URL, projectId, databaseMetadata.getId(), databaseDto.getJdbcUrl())); + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Start to collect tables info...\r\n"); + updateRealTimeLog(collectRecord, realTimeLog); + //查询表元数据信息 + ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(databaseDto.getDatabaseType()); + IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum); + + List tables = metaDataService.queryTableList(databaseDto.getJdbcUrl(), databaseDto.getUserName(), databaseDto.getPassword(), + ProductTypeEnum.ORACLE.equals(productTypeEnum) ? databaseDto.getUserName() : databaseDto.getDatabaseName()); + + if (currentThread.isInterrupted()) { + interrupt(collectRecord, realTimeLog); + return; + } + List dbTables = dataMetadataCollectApi.listParentIdAndDatasourceId(databaseMetadata.getId(), databaseDto.getId(), BuiltInMetamodel.TABLE.getId()).getData(); + //如果是全量,查询数据库中已有的数据 + if (MetadataStrategyType.ALL.getValue().equals(strategy)) { + //如果库里有,tables中没有,删除 + for (DataGovernanceMetadataDto dbTable : dbTables) { + if (tables.stream().noneMatch(item -> item.getTableName().equals(dbTable.getCode()))) { + dataMetadataCollectApi.deleteMetadata(dbTable.getId()); + } + } + } + //采集表的元数据 + for (TableDescription table : tables) { + if (currentThread.isInterrupted()) { + interrupt(collectRecord, realTimeLog); + return; + } + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append(String.format("Start to collect table [%s] info...、\r\n", table.getTableName())); + updateRealTimeLog(collectRecord, realTimeLog); + //创建表元数据信息 + DataGovernanceMetadataDto tableMetadata = new DataGovernanceMetadataDto(); + tableMetadata.setProjectId(projectId); + tableMetadata.setCollectTaskId(collectTaskId); + tableMetadata.setParentId(databaseMetadata.getId()); + tableMetadata.setName(StringUtil.isNotBlank(table.getRemarks()) ? table.getRemarks() : table.getTableName()); + tableMetadata.setPath(databaseMetadata.getPath() + "/" + tableMetadata.getName()); + tableMetadata.setCode(table.getTableName()); + //设置为表类型 + tableMetadata.setMetamodelId(BuiltInMetamodel.TABLE.getId()); + tableMetadata.setIcon(BuiltInMetamodel.TABLE.getIcon()); + tableMetadata.setIfLeaf(0); + tableMetadata.setDbType(dbType); + tableMetadata.setDatasourceId(databaseDto.getId()); + //判断是否有该库的元数据 + DataGovernanceMetadataDto finalTableMetadata = tableMetadata; + DataGovernanceMetadataDto dbTableMetadata = dbTables.stream().filter(item -> item.getCode().equals(finalTableMetadata.getCode())).findFirst().orElse(null); + if (dbTableMetadata != null) { + tableMetadata.setId(dbTableMetadata.getId()); + } + tableMetadata = dataMetadataCollectApi.addOrUpdateMetadata(tableMetadata); + //添加属性 + addOrUpdateProperty(tableMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.TABLE_COMMENT, projectId, tableMetadata.getId(), table.getRemarks())); + addOrUpdateProperty(tableMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.TABLE_SPACE, projectId, tableMetadata.getId(), null)); + //获取表的字段 + List columns = metaDataService.queryTableColumnMeta(databaseDto.getJdbcUrl(), databaseDto.getUserName(), databaseDto.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? databaseDto.getUserName() : databaseDto.getDatabaseName(), table.getTableName()); + List pks = metaDataService.queryTablePrimaryKeys(databaseDto.getJdbcUrl(), databaseDto.getUserName(), databaseDto.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? databaseDto.getUserName() : databaseDto.getDatabaseName(), table.getTableName()); + + List dbColumns = dataMetadataCollectApi.listParentIdAndDatasourceId(tableMetadata.getId(), databaseDto.getId(), BuiltInMetamodel.COLUMN.getId()).getData(); + //如果是全量 + if (MetadataStrategyType.ALL.getValue().equals(strategy)) { + //如果库里有,columns 中没有,删除 + for (DataGovernanceMetadataDto dbColumn : dbColumns) { + if (columns.stream().noneMatch(item -> item.getFieldName().equals(dbColumn.getCode()))) { + dataMetadataCollectApi.deleteMetadata(dbColumn.getId()); + } + } + } + for (ColumnDescription column : columns) { + if (pks.contains(column.getFieldName())) { + column.setPk(true); + } + DataGovernanceMetadataDto columnMetadata = new DataGovernanceMetadataDto(); + columnMetadata.setProjectId(projectId); + columnMetadata.setCollectTaskId(collectTaskId); + columnMetadata.setParentId(tableMetadata.getId()); + columnMetadata.setName(StringUtil.isNotBlank(column.getRemarks()) ? column.getRemarks() : column.getFieldName()); + columnMetadata.setPath(tableMetadata.getPath() + "/" + columnMetadata.getName()); + columnMetadata.setCode(column.getFieldName()); + //设置为字段类型 + columnMetadata.setMetamodelId(BuiltInMetamodel.COLUMN.getId()); + columnMetadata.setIcon(BuiltInMetamodel.COLUMN.getIcon()); + columnMetadata.setIfLeaf(0); + columnMetadata.setDbType(dbType); + columnMetadata.setDatasourceId(databaseDto.getId()); + //判断是否有该元数据 + DataGovernanceMetadataDto finalColumnMetadata = columnMetadata; + DataGovernanceMetadataDto dbColumnMetadata = dbColumns.stream().filter(item -> item.getCode().equals(finalColumnMetadata.getCode())).findFirst().orElse(null); + //DataGovernanceMetadataDto dbColumnMetadata = dataMetadataApi.getByParentIdAndOtherInfo(columnMetadata.getParentId(), columnMetadata.getDatasourceId(), columnMetadata.getCode(), columnMetadata.getMetamodelId()).getData(); + if (dbColumnMetadata != null) { + columnMetadata.setId(dbColumnMetadata.getId()); + } + columnMetadata = dataMetadataCollectApi.addOrUpdateMetadata(columnMetadata); + //添加属性 + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_COMMENT, projectId, columnMetadata.getId(), column.getRemarks())); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_TYPE, projectId, columnMetadata.getId(), column.getFieldTypeName())); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_LENGTH, projectId, columnMetadata.getId(), String.valueOf(column.getDisplaySize()))); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_PRECISION, projectId, columnMetadata.getId(), String.valueOf(column.getPrecisionSize()))); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_SCALE, projectId, columnMetadata.getId(), String.valueOf(column.getScaleSize()))); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_COL_KEY, projectId, columnMetadata.getId(), column.isPk() ? "是" : "否")); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_UNI_KEY, projectId, columnMetadata.getId(), column.isNonIndexUnique() ? "否" : "是")); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_NULLABLE, projectId, columnMetadata.getId(), column.isNullable() ? "是" : "否")); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_AUTO_INCREMENT, projectId, columnMetadata.getId(), column.isAutoIncrement() ? "是" : "否")); + addOrUpdateProperty(columnMetadata, BuiltInMetamodelProperty.buildProerty(BuiltInMetamodelProperty.COLUMN_DATA_DEFAULT, projectId, columnMetadata.getId(), column.getDefaultValue())); + } + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append(String.format("Collect table [%s] info succeed\r\n", table.getTableName())); + updateRealTimeLog(collectRecord, realTimeLog); + } + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("Collect database info succeed\r\n"); + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("All metadata collect succeed\r\n"); + collectRecord.setEndTime(new Date()); + collectRecord.setStatus(MetadataCollectRunStatus.SUCCESS.getCode()); + collectRecord.setRealTimeLog(realTimeLog.toString()); + dataMetadataCollectApi.upCollectRecord(collectRecord); + log.info("metadata collect task success end"); + } catch (Exception e) { + realTimeLog.append(LogUtil.getError(e)); + collectRecord.setStatus(MetadataCollectRunStatus.FAILED.getCode()); + collectRecord.setRealTimeLog(realTimeLog.toString()); + collectRecord.setErrorLog(LogUtil.getError(e)); + collectRecord.setEndTime(new Date()); + dataMetadataCollectApi.upCollectRecord(collectRecord); + log.info("metadata collect task failed end"); + } + } + + private void interrupt(DataGovernanceMetadataCollectRecordDto collectRecord, StringBuilder realTimeLog) { + collectRecord.setStatus(MetadataCollectRunStatus.FAILED.getCode()); + realTimeLog.append(DateUtils.formatDateTime(new Date())).append(" ").append("The collect task has been canceled when is running\r\n"); + collectRecord.setRealTimeLog(realTimeLog.toString()); + collectRecord.setErrorLog(DateUtils.formatDateTime(new Date()) + " The collect task has been canceled when is running"); + collectRecord.setEndTime(new Date()); + dataMetadataCollectApi.upCollectRecord(collectRecord); + } + + /** + * 添加或删除属性 + */ + private void addOrUpdateProperty(DataGovernanceMetadataDto metadataDto, DataGovernanceMetadataPropertyDto metadataPropertyDto) { + DataGovernanceMetadataPropertyDto dbProperty = dataMetadataCollectApi.getByPropertyIdAndMetadataId(metadataPropertyDto.getMetamodelPropertyId(), metadataDto.getId()).getData(); + if (dbProperty != null) { + metadataPropertyDto.setId(dbProperty.getId()); + } + dataMetadataCollectApi.addOrUpdateMetadataProperty(metadataPropertyDto); + } + + /** + * 更新日志 + * + * @param collectRecord + * @param realTimeLog + */ + private void updateRealTimeLog(DataGovernanceMetadataCollectRecordDto collectRecord, StringBuilder realTimeLog) { + collectRecord.setRealTimeLog(realTimeLog.toString()); + dataMetadataCollectApi.upCollectRecord(collectRecord); + } + + +} + + diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataProductionScheduleTask.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataProductionScheduleTask.java new file mode 100644 index 0000000..f7fef81 --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataProductionScheduleTask.java @@ -0,0 +1,33 @@ +package net.srt.quartz.task; + +import cn.hutool.core.thread.ThreadUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.srt.api.module.data.development.DataProductionScheduleApi; +import org.springframework.stereotype.Component; + +/** + * @ClassName DataProductionScheduleTask + * @Author zrx + * @Date 2023/1/19 14:39 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DataProductionScheduleTask { + private final DataProductionScheduleApi dataProductionScheduleApi; + + public void run(String id, Thread currentThread) { + log.info("start run data-production-schedule"); + String recordId = dataProductionScheduleApi.scheduleRun(Long.parseLong(id)).getData(); + //查询调度状态 + do { + ThreadUtil.sleep(5000); + /*if (currentThread.isInterrupted()) { + return; + }*/ + } while (!dataProductionScheduleApi.scheduleComplete(Integer.parseInt(recordId)).getData()); + log.info("run data-production-schedule success"); + } + +} diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataQualityTask.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataQualityTask.java new file mode 100644 index 0000000..2052e73 --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/DataQualityTask.java @@ -0,0 +1,128 @@ +package net.srt.quartz.task; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.srt.api.module.data.governance.DataMetadataApi; +import net.srt.api.module.data.governance.DataQualityApi; +import net.srt.api.module.data.governance.constant.DbType; +import net.srt.api.module.data.governance.dto.DataGovernanceMetadataDto; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityConfigDto; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskDto; +import net.srt.api.module.data.governance.dto.quality.QualityCheck; +import net.srt.api.module.data.governance.dto.quality.QulaityColumn; +import net.srt.api.module.data.integrate.DataDatabaseApi; +import net.srt.api.module.data.integrate.constant.CommonRunStatus; +import net.srt.api.module.data.integrate.dto.DataDatabaseDto; +import net.srt.flink.common.utils.LogUtil; +import net.srt.framework.common.cache.bean.DataProjectCacheBean; +import net.srt.framework.common.utils.DateUtils; +import net.srt.framework.security.cache.TokenStoreCache; +import net.srt.quartz.task.quality.AbstractQualityAdapter; +import net.srt.quartz.task.quality.QualityFactory; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * @ClassName DataProductionScheduleTask + * @Author zrx + * @Date 2023/1/19 14:39 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DataQualityTask { + + private final DataQualityApi dataQualityApi; + private final DataMetadataApi dataMetadataApi; + private final DataDatabaseApi dataDatabaseApi; + private final TokenStoreCache tokenStoreCache; + + public void run(String id, Thread currentThread) { + + log.info("DataQualityTask run start"); + + Long configId = Long.parseLong(id); + DataGovernanceQualityConfigDto configDto = dataQualityApi.getById(configId).getData(); + Long projectId = configDto.getProjectId(); + List metadataIds = configDto.getMetadataIds(); + Integer ruleId = configDto.getRuleId(); + List qualityChecks = new ArrayList<>(); + + //添加质量检测任务 + DataGovernanceQualityTaskDto qualityTaskDto = new DataGovernanceQualityTaskDto(); + try { + qualityTaskDto.setQualityConfigId(configId); + qualityTaskDto.setName(configDto.getName() + " - " + DateUtils.formatDateTime(new Date())); + qualityTaskDto.setStatus(CommonRunStatus.RUNNING.getCode()); + qualityTaskDto.setStartTime(new Date()); + qualityTaskDto.setProjectId(configDto.getProjectId()); + qualityTaskDto = dataQualityApi.addQualityTask(qualityTaskDto).getData(); + + for (Integer metadataId : metadataIds) { + DataGovernanceMetadataDto columnMeta = dataMetadataApi.getById(metadataId).getData(); + DataGovernanceMetadataDto tableMeta = dataMetadataApi.getById(columnMeta.getParentId().intValue()).getData(); + QualityCheck qualityCheck = new QualityCheck(); + qualityCheck.setRuleId(ruleId); + qualityCheck.setParam(configDto.getParam()); + qualityCheck.setTableMetadataId(tableMeta.getId()); + qualityCheck.setTableName(tableMeta.getCode()); + QulaityColumn qulaityColumn = new QulaityColumn(columnMeta.getId().intValue(), columnMeta.getCode()); + if (DbType.MIDDLE_DB.getValue().equals(tableMeta.getDbType())) { + DataProjectCacheBean project = tokenStoreCache.getProject(projectId); + qualityCheck.setDatabaseName(project.getDbName()); + qualityCheck.setJdbcUrl(project.getDbUrl()); + qualityCheck.setUserName(project.getDbUsername()); + qualityCheck.setPassword(project.getDbPassword()); + qualityCheck.setDatabaseType(project.getDbType()); + } else { + DataDatabaseDto database = dataDatabaseApi.getById(tableMeta.getDatasourceId()).getData(); + qualityCheck.setDatabaseName(database.getDatabaseName()); + qualityCheck.setJdbcUrl(database.getJdbcUrl()); + qualityCheck.setUserName(database.getUserName()); + qualityCheck.setPassword(database.getPassword()); + qualityCheck.setDatabaseType(database.getDatabaseType()); + } + if (qualityChecks.contains(qualityCheck)) { + qualityCheck = qualityChecks.get(qualityChecks.indexOf(qualityCheck)); + qualityCheck.getQulaityColumns().add(qulaityColumn); + } else { + qualityChecks.add(qualityCheck); + qualityCheck.getQulaityColumns().add(qulaityColumn); + } + } + + //每张表依次质量检测 + for (QualityCheck qualityCheck : qualityChecks) { + if (currentThread.isInterrupted()) { + log.error("DataQualityTask run interrupted"); + qualityTaskDto.setEndTime(new Date()); + qualityTaskDto.setStatus(CommonRunStatus.FAILED.getCode()); + qualityTaskDto.setErrorLog("DataQualityTask run interrupted"); + dataQualityApi.updateQualityTask(qualityTaskDto); + return; + } + //获取质量检测的适配器 + AbstractQualityAdapter qualityAdapter = QualityFactory.createQualityAdapter(qualityCheck); + qualityAdapter.prepare(dataQualityApi, qualityTaskDto); + qualityAdapter.check(); + } + + qualityTaskDto.setEndTime(new Date()); + qualityTaskDto.setStatus(CommonRunStatus.SUCCESS.getCode()); + dataQualityApi.updateQualityTask(qualityTaskDto); + log.info("DataQualityTask run end"); + + } catch (Exception e) { + log.error("DataQualityTask run error", e); + qualityTaskDto.setEndTime(new Date()); + qualityTaskDto.setStatus(CommonRunStatus.FAILED.getCode()); + qualityTaskDto.setErrorLog(LogUtil.getError(e)); + dataQualityApi.updateQualityTask(qualityTaskDto); + } + + } + +} diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/AbstractQualityAdapter.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/AbstractQualityAdapter.java new file mode 100644 index 0000000..8829982 --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/AbstractQualityAdapter.java @@ -0,0 +1,150 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package net.srt.quartz.task.quality; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.zaxxer.hikari.HikariDataSource; +import lombok.extern.slf4j.Slf4j; +import net.srt.api.module.data.governance.DataQualityApi; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskDto; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto; +import net.srt.api.module.data.governance.dto.quality.QualityCheck; +import net.srt.api.module.data.governance.dto.quality.QulaityColumn; +import net.srt.api.module.data.integrate.constant.CommonRunStatus; +import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum; +import srt.cloud.framework.dbswitch.common.util.DbswitchStrUtils; +import srt.cloud.framework.dbswitch.common.util.SingletonObject; +import srt.cloud.framework.dbswitch.common.util.TypeConvertUtils; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 数据库写入抽象基类 + * + * @author jrl + */ +@Slf4j +public abstract class AbstractQualityAdapter { + + protected QualityCheck qualityCheck; + protected DataQualityApi dataQualityApi; + protected DataGovernanceQualityTaskDto qualityTaskDto; + protected String regx; + + public AbstractQualityAdapter(QualityCheck qualityCheck) { + this.qualityCheck = qualityCheck; + } + + public void prepare(DataQualityApi dataQualityApi, DataGovernanceQualityTaskDto qualityTaskDto) { + this.dataQualityApi = dataQualityApi; + this.qualityTaskDto = qualityTaskDto; + } + + public void setRegx(String regx) { + this.regx = regx; + } + + public static HikariDataSource createDataSource(QualityCheck qualityCheck) { + ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType()); + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(qualityCheck.getJdbcUrl()); + ds.setDriverClassName(productTypeEnum.getDriveClassName()); + ds.setUsername(qualityCheck.getUserName()); + ds.setPassword(qualityCheck.getPassword()); + if (ds.getDriverClassName().contains("oracle")) { + ds.setConnectionTestQuery("SELECT 'Hello' from DUAL"); + // https://blog.csdn.net/qq_20960159/article/details/78593936 + System.getProperties().setProperty("oracle.jdbc.J2EE13Compliant", "true"); + } else if (ds.getDriverClassName().contains("db2")) { + ds.setConnectionTestQuery("SELECT 1 FROM SYSIBM.SYSDUMMY1"); + } else { + ds.setConnectionTestQuery("SELECT 1"); + } + ds.setMaximumPoolSize(8); + ds.setMinimumIdle(5); + ds.setMaxLifetime(TimeUnit.MINUTES.toMillis(60)); + ds.setConnectionTimeout(TimeUnit.MINUTES.toMillis(60)); + ds.setIdleTimeout(60000); + + return ds; + } + + protected DataGovernanceQualityTaskTableDto addTaskTable(List columns) { + DataGovernanceQualityTaskTableDto qualityTaskTableDto = new DataGovernanceQualityTaskTableDto(); + qualityTaskTableDto.setQualityTaskId(qualityTaskDto.getId()); + qualityTaskTableDto.setTableMetadataId(qualityCheck.getTableMetadataId()); + qualityTaskTableDto.setTableName(qualityCheck.getTableName()); + qualityTaskTableDto.setStatus(CommonRunStatus.RUNNING.getCode()); + qualityTaskTableDto.setColumnInfo(columns); + qualityTaskTableDto.setProjectId(qualityTaskDto.getProjectId()); + qualityTaskTableDto.setCheckTime(new Date()); + qualityTaskTableDto.setStartTime(new Date()); + qualityTaskTableDto = dataQualityApi.addTaskTable(qualityTaskTableDto).getData(); + return qualityTaskTableDto; + } + + protected Map buildRowMap(List columns, ResultSet rs) throws SQLException { + Map map = new HashMap<>(); + //转换 + for (int i = 1; i <= columns.size(); i++) { + Object value = rs.getObject(i); + String key = columns.get(i - 1); + if (value instanceof byte[]) { + map.put(key, DbswitchStrUtils.toHexString((byte[]) value)); + } else if (value instanceof java.sql.Clob) { + map.put(key, TypeConvertUtils.castToString(value)); + } else if (value instanceof java.sql.Blob) { + map.put(key, DbswitchStrUtils.toHexString(TypeConvertUtils.castToByteArray(value))); + } else { + map.put(key, null == value ? null : value.toString()); + } + } + return map; + } + + protected DataGovernanceQualityTaskColumnDto buildTaskColumn(DataGovernanceQualityTaskTableDto taskTable, Map map, boolean pass, String notPassInfo) throws JsonProcessingException { + DataGovernanceQualityTaskColumnDto columnDto = new DataGovernanceQualityTaskColumnDto(); + columnDto.setQualityTaskId(taskTable.getQualityTaskId()); + columnDto.setQualityTaskTableId(taskTable.getId()); + columnDto.setCheckRow(SingletonObject.OBJECT_MAPPER.writeValueAsString(map)); + columnDto.setCheckTime(new Date()); + columnDto.setProjectId(taskTable.getProjectId()); + qualityTaskDto.setCheckCount(qualityTaskDto.getCheckCount() + 1); + taskTable.setCheckCount(taskTable.getCheckCount() + 1); + if (pass) { + qualityTaskDto.setPassCount(qualityTaskDto.getPassCount() + 1); + taskTable.setPassCount(taskTable.getPassCount() + 1); + columnDto.setCheckResult(1); + } else { + qualityTaskDto.setNotPassCount(qualityTaskDto.getNotPassCount() + 1); + columnDto.setNotPassInfo(notPassInfo); + taskTable.setNotPassCount(taskTable.getNotPassCount() + 1); + columnDto.setCheckResult(0); + } + return columnDto; + } + + protected void updateTask(DataGovernanceQualityTaskTableDto taskTable, List columnDtos) { + dataQualityApi.updateQualityTask(qualityTaskDto); + dataQualityApi.updateQualityTaskTable(taskTable); + //添加字段检测记录 + dataQualityApi.addQualityTaskColumns(columnDtos); + columnDtos.clear(); + } + + public abstract void check(); +} diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/QualityFactory.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/QualityFactory.java new file mode 100644 index 0000000..f4d9b27 --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/QualityFactory.java @@ -0,0 +1,79 @@ +// Copyright tang. All rights reserved. +// https://gitee.com/inrgihc/dbswitch +// +// Use of this source code is governed by a BSD-style license +// +// Author: tang (inrgihc@126.com) +// Date : 2020/1/2 +// Location: beijing , china +///////////////////////////////////////////////////////////// +package net.srt.quartz.task.quality; + +import net.srt.api.module.data.governance.constant.BuiltInQualityRule; +import net.srt.api.module.data.governance.dto.quality.QualityCheck; +import net.srt.quartz.task.quality.impl.DateFormatImpl; +import net.srt.quartz.task.quality.impl.RegxImpl; +import net.srt.quartz.task.quality.impl.UniquenessImpl; +import srt.cloud.framework.dbswitch.common.util.StringUtil; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * 数据库写入器构造工厂类 + * + * @author jrl + */ +public class QualityFactory { + + private static final Map> QUALITY_RULE_MAPPER + = new HashMap>() { + + private static final long serialVersionUID = 3365136872693503697L; + + { + put(BuiltInQualityRule.UNIQUENESS, UniquenessImpl::new); + put(BuiltInQualityRule.PHONE_NUMBER, RegxImpl::new); + put(BuiltInQualityRule.ID_CARD, RegxImpl::new); + put(BuiltInQualityRule.MAIL, RegxImpl::new); + put(BuiltInQualityRule.NUMBER_FORMAT, RegxImpl::new); + put(BuiltInQualityRule.LENGTH_CHECK, RegxImpl::new); + put(BuiltInQualityRule.NON_NULL_CHECK, RegxImpl::new); + put(BuiltInQualityRule.DATE_FORMAT, DateFormatImpl::new); + } + }; + + + public static AbstractQualityAdapter createQualityAdapter(QualityCheck qualityCheck) { + Integer ruleId = qualityCheck.getRuleId(); + BuiltInQualityRule rule = BuiltInQualityRule.getById(ruleId); + if (rule == null) { + throw new RuntimeException( + String.format("Unsupported ruleId (%s)", ruleId)); + } + if (!QUALITY_RULE_MAPPER.containsKey(rule)) { + throw new RuntimeException( + String.format("Unsupported rule type (%s)", rule)); + } + + AbstractQualityAdapter qualityAdapter = QUALITY_RULE_MAPPER.get(rule).apply(qualityCheck); + if (qualityAdapter instanceof RegxImpl) { + if (BuiltInQualityRule.PHONE_NUMBER.equals(rule)) { + qualityAdapter.setRegx(StringUtil.REGEX_PHONE); + } else if (BuiltInQualityRule.ID_CARD.equals(rule)) { + qualityAdapter.setRegx(StringUtil.REGEX_IDCARD); + } else if (BuiltInQualityRule.MAIL.equals(rule)) { + qualityAdapter.setRegx(StringUtil.REGEX_EMAIL); + } else if (BuiltInQualityRule.NON_NULL_CHECK.equals(rule)) { + qualityAdapter.setRegx(StringUtil.NOT_BLANK); + } else if (BuiltInQualityRule.NUMBER_FORMAT.equals(rule)) { + qualityAdapter.setRegx(StringUtil.REGEX_NUMBER); + } else if (BuiltInQualityRule.LENGTH_CHECK.equals(rule)) { + qualityAdapter.setRegx(String.format(".{%s}", qualityCheck.getParam().getColumnLength())); + } + } + return qualityAdapter; + } + +} diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/DateFormatImpl.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/DateFormatImpl.java new file mode 100644 index 0000000..c3c86fa --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/DateFormatImpl.java @@ -0,0 +1,108 @@ +package net.srt.quartz.task.quality.impl; + +import lombok.extern.slf4j.Slf4j; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto; +import net.srt.api.module.data.governance.dto.quality.QualityCheck; +import net.srt.api.module.data.governance.dto.quality.QulaityColumn; +import net.srt.api.module.data.integrate.constant.CommonRunStatus; +import net.srt.flink.common.utils.LogUtil; +import net.srt.quartz.task.quality.AbstractQualityAdapter; +import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum; +import srt.cloud.framework.dbswitch.common.util.StringUtil; +import srt.cloud.framework.dbswitch.dbcommon.database.DatabaseOperatorFactory; +import srt.cloud.framework.dbswitch.dbcommon.database.IDatabaseOperator; +import srt.cloud.framework.dbswitch.dbcommon.domain.StatementResultSet; + +import java.sql.ResultSet; +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +/** + * @ClassName UniquenessImpl + * @Author zrx + * @Date 2023/6/24 13:13 + */ +@Slf4j +public class DateFormatImpl extends AbstractQualityAdapter { + + private final static Integer BATCH_SIZE = 5000; + + public DateFormatImpl(QualityCheck qualityCheck) { + super(qualityCheck); + } + + @Override + public void check() { + log.info(String.format("DateFormatImpl start check,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName())); + //连接数据库 + ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType()); + List qulaityColumns = qualityCheck.getQulaityColumns(); + addTaskColumns(productTypeEnum, qulaityColumns); + + } + + private void addTaskColumns(ProductTypeEnum productTypeEnum, List columnInfos) { + DataGovernanceQualityTaskTableDto taskTable = addTaskTable(columnInfos); + try { + List columns = columnInfos.stream().map(QulaityColumn::getColumnName).collect(Collectors.toList()); + IDatabaseOperator sourceOperator = DatabaseOperatorFactory + .createDatabaseOperator(createDataSource(qualityCheck), productTypeEnum); + sourceOperator.setFetchSize(BATCH_SIZE); + try (StatementResultSet srs = sourceOperator.queryTableData(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns); ResultSet rs = srs.getResultset()) { + int size = 0; + List columnDtos = new ArrayList<>(); + while (rs.next()) { + size++; + Map map = new HashMap<>(); + //转换 + for (int i = 1; i <= columns.size(); i++) { + Object value = rs.getObject(i); + String key = columns.get(i - 1); + map.put(key, value); + } + //逐个判断是否符合正则表达式 + StringBuilder notPassInfo = new StringBuilder(); + boolean pass = true; + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (!(value instanceof java.sql.Time || value instanceof java.sql.Timestamp || value instanceof java.sql.Date || value instanceof LocalDateTime)) { + String vStr = String.valueOf(value); + if (!vStr.matches(StringUtil.DATE_FORAMT)) { + pass = false; + notPassInfo.append(String.format("【%s】字段不合规;", key)); + } + } + } + //构建taskColumn + DataGovernanceQualityTaskColumnDto columnDto = buildTaskColumn(taskTable, map, pass, notPassInfo.length() > 0 ? notPassInfo.deleteCharAt(notPassInfo.length() - 1).toString() : null); + columnDtos.add(columnDto); + //5000一次 + if (size % BATCH_SIZE == 0) { + //更新任务和表任务的检测数 + updateTask(taskTable, columnDtos); + } + } + // 检查剩下没更新的 + if (!columnDtos.isEmpty()) { + updateTask(taskTable, columnDtos); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + taskTable.setEndTime(new Date()); + taskTable.setStatus(CommonRunStatus.SUCCESS.getCode()); + dataQualityApi.updateQualityTaskTable(taskTable); + } catch (Exception e) { + taskTable.setEndTime(new Date()); + taskTable.setErrorLog(LogUtil.getError(e)); + taskTable.setStatus(CommonRunStatus.FAILED.getCode()); + dataQualityApi.updateQualityTaskTable(taskTable); + throw new RuntimeException(e); + } + } + + +} diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/RegxImpl.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/RegxImpl.java new file mode 100644 index 0000000..9628ff5 --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/RegxImpl.java @@ -0,0 +1,106 @@ +package net.srt.quartz.task.quality.impl; + +import lombok.extern.slf4j.Slf4j; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto; +import net.srt.api.module.data.governance.dto.quality.QualityCheck; +import net.srt.api.module.data.governance.dto.quality.QulaityColumn; +import net.srt.api.module.data.integrate.constant.CommonRunStatus; +import net.srt.flink.common.utils.LogUtil; +import net.srt.quartz.task.quality.AbstractQualityAdapter; +import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum; +import srt.cloud.framework.dbswitch.dbcommon.database.DatabaseOperatorFactory; +import srt.cloud.framework.dbswitch.dbcommon.database.IDatabaseOperator; +import srt.cloud.framework.dbswitch.dbcommon.domain.StatementResultSet; + +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @ClassName UniquenessImpl + * @Author zrx + * @Date 2023/6/24 13:13 + */ +@Slf4j +public class RegxImpl extends AbstractQualityAdapter { + + private final static Integer BATCH_SIZE = 5000; + + public RegxImpl(QualityCheck qualityCheck) { + super(qualityCheck); + } + + @Override + public void check() { + log.info(String.format("RegxImpl start check,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName())); + //连接数据库 + ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType()); + List qulaityColumns = qualityCheck.getQulaityColumns(); + addTaskColumns(productTypeEnum, qulaityColumns); + + } + + private void addTaskColumns(ProductTypeEnum productTypeEnum, List columnInfos) { + DataGovernanceQualityTaskTableDto taskTable = addTaskTable(columnInfos); + try { + List columns = columnInfos.stream().map(QulaityColumn::getColumnName).collect(Collectors.toList()); + IDatabaseOperator sourceOperator = DatabaseOperatorFactory + .createDatabaseOperator(createDataSource(qualityCheck), productTypeEnum); + sourceOperator.setFetchSize(BATCH_SIZE); + try (StatementResultSet srs = sourceOperator.queryTableData(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns); ResultSet rs = srs.getResultset()) { + int size = 0; + List columnDtos = new ArrayList<>(); + while (rs.next()) { + size++; + Map map = buildRowMap(columns, rs); + //逐个判断是否符合正则表达式 + StringBuilder notPassInfo = new StringBuilder(); + boolean pass = true; + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (value == null) { + pass = false; + notPassInfo.append(String.format("【%s】字段不合规;", key)); + } else { + String vStr = String.valueOf(value); + if (!vStr.matches(regx)) { + pass = false; + notPassInfo.append(String.format("【%s】字段不合规;", key)); + } + } + } + //构建taskColumn + DataGovernanceQualityTaskColumnDto columnDto = buildTaskColumn(taskTable, map, pass, notPassInfo.length() > 0 ? notPassInfo.deleteCharAt(notPassInfo.length() - 1).toString() : null); + columnDtos.add(columnDto); + //5000一次 + if (size % BATCH_SIZE == 0) { + //更新任务和表任务的检测数 + updateTask(taskTable, columnDtos); + } + } + // 检查剩下没更新的 + if (!columnDtos.isEmpty()) { + updateTask(taskTable, columnDtos); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + taskTable.setEndTime(new Date()); + taskTable.setStatus(CommonRunStatus.SUCCESS.getCode()); + dataQualityApi.updateQualityTaskTable(taskTable); + } catch (Exception e) { + taskTable.setEndTime(new Date()); + taskTable.setErrorLog(LogUtil.getError(e)); + taskTable.setStatus(CommonRunStatus.FAILED.getCode()); + dataQualityApi.updateQualityTaskTable(taskTable); + throw new RuntimeException(e); + } + } + + +} diff --git a/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/UniquenessImpl.java b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/UniquenessImpl.java new file mode 100644 index 0000000..19cef94 --- /dev/null +++ b/srt-cloud-module/srt-cloud-quartz/src/main/java/net/srt/quartz/task/quality/impl/UniquenessImpl.java @@ -0,0 +1,112 @@ +package net.srt.quartz.task.quality.impl; + +import lombok.extern.slf4j.Slf4j; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskColumnDto; +import net.srt.api.module.data.governance.dto.DataGovernanceQualityTaskTableDto; +import net.srt.api.module.data.governance.dto.quality.QualityCheck; +import net.srt.api.module.data.governance.dto.quality.QualityConfigParam; +import net.srt.api.module.data.governance.dto.quality.QulaityColumn; +import net.srt.api.module.data.integrate.constant.CommonRunStatus; +import net.srt.flink.common.utils.LogUtil; +import net.srt.quartz.task.quality.AbstractQualityAdapter; +import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum; +import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService; +import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl; +import srt.cloud.framework.dbswitch.dbcommon.database.DatabaseOperatorFactory; +import srt.cloud.framework.dbswitch.dbcommon.database.IDatabaseOperator; +import srt.cloud.framework.dbswitch.dbcommon.domain.StatementResultSet; + +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @ClassName UniquenessImpl + * @Author zrx + * @Date 2023/6/24 13:13 + */ +@Slf4j +public class UniquenessImpl extends AbstractQualityAdapter { + + private final static Integer BATCH_SIZE = 5000; + + public UniquenessImpl(QualityCheck qualityCheck) { + super(qualityCheck); + } + + @Override + public void check() { + log.info(String.format("UniquenessImpl start check,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName())); + //连接数据库 + ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(qualityCheck.getDatabaseType()); + List qulaityColumns = qualityCheck.getQulaityColumns(); + //判断是组合式唯一还是单字段唯一 + QualityConfigParam param = qualityCheck.getParam(); + IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum); + + if (param.getUniqueType() == 1) { + //单字段唯一,每个字段检测一次 + for (QulaityColumn qulaityColumn : qulaityColumns) { + List columnInfos = new ArrayList<>(); + columnInfos.add(qulaityColumn); + addTaskColumns(productTypeEnum, metaDataService, columnInfos); + } + } else { + //组合字段唯一 + addTaskColumns(productTypeEnum, metaDataService, qulaityColumns); + } + log.info(String.format("UniquenessImpl check end,jdbcUrl:%s,tableName:%s", qualityCheck.getJdbcUrl(), qualityCheck.getTableName())); + + } + + private void addTaskColumns(ProductTypeEnum productTypeEnum, IMetaDataByJdbcService metaDataService, List columnInfos) { + DataGovernanceQualityTaskTableDto taskTable = addTaskTable(columnInfos); + try { + List columns = columnInfos.stream().map(QulaityColumn::getColumnName).collect(Collectors.toList()); + String countMoreThanOneSql = metaDataService.getCountMoreThanOneSql(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns); + String countOneSql = metaDataService.getCountOneSql(ProductTypeEnum.ORACLE.equals(productTypeEnum) ? qualityCheck.getUserName() : qualityCheck.getDatabaseName(), qualityCheck.getTableName(), columns); + //挨个字段检测唯一 + executeSql(productTypeEnum, countMoreThanOneSql, columns, taskTable, false); + executeSql(productTypeEnum, countOneSql, columns, taskTable, true); + taskTable.setEndTime(new Date()); + taskTable.setStatus(CommonRunStatus.SUCCESS.getCode()); + dataQualityApi.updateQualityTaskTable(taskTable); + } catch (Exception e) { + taskTable.setEndTime(new Date()); + taskTable.setErrorLog(LogUtil.getError(e)); + taskTable.setStatus(CommonRunStatus.FAILED.getCode()); + dataQualityApi.updateQualityTaskTable(taskTable); + throw new RuntimeException(e); + } + } + + private void executeSql(ProductTypeEnum productTypeEnum, String sql, List columns, DataGovernanceQualityTaskTableDto taskTable, Boolean pass) { + IDatabaseOperator sourceOperator = DatabaseOperatorFactory + .createDatabaseOperator(createDataSource(qualityCheck), productTypeEnum); + sourceOperator.setFetchSize(BATCH_SIZE); + try (StatementResultSet srs = sourceOperator.queryTableData(sql); ResultSet rs = srs.getResultset()) { + int size = 0; + List columnDtos = new ArrayList<>(); + while (rs.next()) { + size++; + Map map = buildRowMap(columns, rs); + DataGovernanceQualityTaskColumnDto columnDto = buildTaskColumn(taskTable, map, pass, String.join(",", columns) + "不唯一"); + columnDtos.add(columnDto); + //5000一次 + if (size % BATCH_SIZE == 0) { + updateTask(taskTable, columnDtos); + } + } + // 检查剩下没更新的 + if (!columnDtos.isEmpty()) { + updateTask(taskTable, columnDtos); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java index d5fe5e0..77ec03e 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java +++ b/srt-data-development/src/main/java/net/srt/disposition/controller/DataProductionScheduleController.java @@ -1,11 +1,15 @@ package net.srt.disposition.controller; +import cn.hutool.db.Page; +import cn.hutool.log.dialect.console.ConsoleLog; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.AllArgsConstructor; import net.srt.disposition.dto.Flow; +import net.srt.disposition.query.DataProductionScheduleRecordQuery; import net.srt.disposition.query.DataProductionsScheduleQuery; import net.srt.disposition.service.DataProductionScheduleService; +import net.srt.disposition.vo.DataProductionScheduleRecordVO; import net.srt.disposition.vo.DataProductionScheduleVo; import net.srt.framework.common.page.PageResult; import net.srt.framework.common.utils.Result; @@ -74,6 +78,30 @@ public class DataProductionScheduleController { return Result.ok(); } + @GetMapping("record/page") + @Operation(summary = "调度日记记录分页") + @PreAuthorize("hasAuthority('data-development:schedule:record:page')") + public Result> pageRecord(@Valid DataProductionScheduleRecordQuery query){ + PageResult page=dataProductionScheduleService.pageRecord(query); + return Result.ok(page); + + } + @DeleteMapping("/record") + @Operation(summary = "删除调度日志记录") + @PreAuthorize("hasAuthority('data-development:schedule:record:delete')") + public Result delRecord(@RequestBody List idList) { + dataProductionScheduleService.delRecord(idList); + return Result.ok(); + } + + @GetMapping("/log/{recordId}") + @Operation(summary = "获取流程图执行的日志") + public Result getLog(@PathVariable Integer recordId) { + return Result.ok(dataProductionScheduleService.getLog(recordId)); + } + + + } diff --git a/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionScheduleRecordConvert.java b/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionScheduleRecordConvert.java new file mode 100644 index 0000000..a278920 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/convert/DataProductionScheduleRecordConvert.java @@ -0,0 +1,22 @@ +package net.srt.disposition.convert; + +import net.srt.disposition.entity.DataProductionScheduleRecordEntity; +import net.srt.disposition.vo.DataProductionScheduleRecordVO; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.convert + * @Author: jpz + * @CreateTime: 2023/12/29 9:08 + */ +@Mapper +public interface DataProductionScheduleRecordConvert { + + DataProductionScheduleRecordConvert INSTANCE = Mappers.getMapper(DataProductionScheduleRecordConvert.class); + + List convertList(List list); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/dto/ConsoleLog.java b/srt-data-development/src/main/java/net/srt/disposition/dto/ConsoleLog.java new file mode 100644 index 0000000..7dfac8e --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/dto/ConsoleLog.java @@ -0,0 +1,16 @@ +package net.srt.disposition.dto; + +import lombok.Data; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.dto + * @Author: jpz + * @CreateTime: 2023/12/29 9:50 + */ +@Data +public class ConsoleLog { + + private String log = ""; + private boolean end = false; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionScheduleRecordQuery.java b/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionScheduleRecordQuery.java new file mode 100644 index 0000000..9174968 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/query/DataProductionScheduleRecordQuery.java @@ -0,0 +1,29 @@ +package net.srt.disposition.query; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.srt.framework.common.query.Query; +import net.srt.framework.common.utils.DateUtils; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.query + * @Author: jpz + * @CreateTime: 2023/12/29 8:50 + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Schema(description = "数据生产-作业调度记录查询") +public class DataProductionScheduleRecordQuery extends Query { + private String name; + private Integer runStatus; + private Integer executeType; + @DateTimeFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date startTime; + @DateTimeFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date endTime; +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleRecordService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleRecordService.java new file mode 100644 index 0000000..0c678ea --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleRecordService.java @@ -0,0 +1,22 @@ +package net.srt.disposition.service; + +import net.srt.disposition.entity.DataProductionScheduleRecordEntity; +import net.srt.disposition.query.DataProductionScheduleRecordQuery; +import net.srt.disposition.vo.DataProductionScheduleRecordVO; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.mybatis.service.BaseService; + +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service + * @Author: jpz + * @CreateTime: 2023/12/29 8:55 + */ +public interface DataProductionScheduleRecordService extends BaseService { + + PageResult page(DataProductionScheduleRecordQuery query); + + void delete(List idList); +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java index 8f5771d..5114eea 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java +++ b/srt-data-development/src/main/java/net/srt/disposition/service/DataProductionScheduleService.java @@ -1,8 +1,11 @@ package net.srt.disposition.service; +import cn.hutool.log.dialect.console.ConsoleLog; import net.srt.disposition.dto.Flow; import net.srt.disposition.entity.DataProductionScheduleEntity; +import net.srt.disposition.query.DataProductionScheduleRecordQuery; import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.vo.DataProductionScheduleRecordVO; import net.srt.disposition.vo.DataProductionScheduleVo; import net.srt.framework.common.page.PageResult; import net.srt.framework.mybatis.service.BaseService; @@ -29,4 +32,10 @@ public interface DataProductionScheduleService extends BaseService idList); void release(Integer id); + + PageResult pageRecord(DataProductionScheduleRecordQuery query); + + void delRecord(List idList); + + ConsoleLog getLog(Integer recordId); } diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleRecordServiceimpl.java b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleRecordServiceimpl.java new file mode 100644 index 0000000..19b32bc --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleRecordServiceimpl.java @@ -0,0 +1,51 @@ +package net.srt.disposition.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import lombok.AllArgsConstructor; +import net.srt.disposition.convert.DataProductionScheduleRecordConvert; +import net.srt.disposition.entity.DataProductionScheduleRecordEntity; +import net.srt.disposition.mapper.DataProductionScheduleRecordDao; +import net.srt.disposition.query.DataProductionScheduleRecordQuery; +import net.srt.disposition.service.DataProductionScheduleRecordService; +import net.srt.disposition.vo.DataProductionScheduleRecordVO; +import net.srt.framework.common.page.PageResult; +import net.srt.framework.mybatis.service.impl.BaseServiceImpl; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.common.util.StringUtil; + +import java.util.List; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.service.impl + * @Author: jpz + * @CreateTime: 2023/12/29 8:58 + */ +@Service +@AllArgsConstructor +public class DataProductionScheduleRecordServiceimpl extends BaseServiceImpl implements DataProductionScheduleRecordService { + @Override + public PageResult page(DataProductionScheduleRecordQuery query) { + IPage page = baseMapper.selectPage(getPage(query),getWrapper(query)); + return new PageResult<>(DataProductionScheduleRecordConvert.INSTANCE.convertList(page.getRecords()), page.getTotal()); + } + + @Override + public void delete(List idList) { + removeByIds(idList); + } + + private LambdaQueryWrapper getWrapper(DataProductionScheduleRecordQuery query) { + LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(); + wrapper.like(StringUtil.isNotBlank(query.getName()), DataProductionScheduleRecordEntity::getName, query.getName()) + .eq(query.getRunStatus()!=null, DataProductionScheduleRecordEntity::getRunStatus, query.getRunStatus()) + .eq(query.getExecuteType()!=null, DataProductionScheduleRecordEntity::getExecuteType, query.getExecuteType()) + .eq(query.getStartTime()!=null, DataProductionScheduleRecordEntity::getStartTime, query.getStartTime()) + .eq(query.getEndTime()!=null, DataProductionScheduleRecordEntity::getEndTime, query.getEndTime()) + .orderByDesc(DataProductionScheduleRecordEntity::getCreateTime) + .orderByDesc(DataProductionScheduleRecordEntity::getId); + return wrapper; + } +} diff --git a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java index 8233a1a..3f77d42 100644 --- a/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java +++ b/srt-data-development/src/main/java/net/srt/disposition/service/impl/DataProductionScheduleServiceimpl.java @@ -1,5 +1,6 @@ package net.srt.disposition.service.impl; +import cn.hutool.log.dialect.console.ConsoleLog; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -21,8 +22,11 @@ import net.srt.disposition.entity.DataProductionScheduleNodeEntity; import net.srt.disposition.entity.DataProductionScheduleNodeRecordEntity; import net.srt.disposition.entity.DataProductionScheduleRecordEntity; import net.srt.disposition.mapper.DataProductionScheduleDao; +import net.srt.disposition.query.DataProductionScheduleRecordQuery; import net.srt.disposition.query.DataProductionsScheduleQuery; +import net.srt.disposition.service.DataProductionScheduleRecordService; import net.srt.disposition.service.DataProductionScheduleService; +import net.srt.disposition.vo.DataProductionScheduleRecordVO; import net.srt.disposition.vo.DataProductionScheduleVo; import net.srt.flink.common.config.Dialect; import net.srt.flink.common.utils.JSONUtil; @@ -57,6 +61,7 @@ public class DataProductionScheduleServiceimpl extends BaseServiceImpl page(DataProductionsScheduleQuery query) { IPage page=baseMapper.selectPage(getPage(query),getWrapper(query)); @@ -125,6 +130,21 @@ public class DataProductionScheduleServiceimpl extends BaseServiceImpl pageRecord(DataProductionScheduleRecordQuery query) { + return recordService.page(query); + } + + @Override + public void delRecord(List idList) { + recordService.delete(idList); + } + + @Override + public ConsoleLog getLog(Integer recordId) { + return null; + } + private String scheduleRun(Integer id, ExecuteType executeType) { DataProductionScheduleEntity scheduleEntity=baseMapper.selectById(id); diff --git a/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleRecordVO.java b/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleRecordVO.java new file mode 100644 index 0000000..4a6baf1 --- /dev/null +++ b/srt-data-development/src/main/java/net/srt/disposition/vo/DataProductionScheduleRecordVO.java @@ -0,0 +1,70 @@ +package net.srt.disposition.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.srt.framework.common.utils.DateUtils; + +import java.io.Serializable; +import java.util.Date; + +/** + * @BelongsProject: srt_cloud + * @BelongsPackage: net.srt.disposition.vo + * @Author: jpz + * @CreateTime: 2023/12/29 8:47 + */ +@Data +@Schema(description = "数据生产-作业调度记录") +public class DataProductionScheduleRecordVO implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "主键id") + private Integer id; + + private String name; + + @Schema(description = "调度id") + private Integer taskScheduleId; + + @Schema(description = "项目(租户)id") + private Long projectId; + + @Schema(description = "当前状态 字典 run_status") + private Integer runStatus; + + @Schema(description = "开始时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date startTime; + + @Schema(description = "结束时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date endTime; + + @Schema(description = "运行日志") + private String log; + + private Integer executeType; + + private String configJson; + + @Schema(description = "版本号") + private Integer version; + + @Schema(description = "删除标识 0:正常 1:已删除") + private Integer deleted; + + @Schema(description = "创建者") + private Long creator; + + @Schema(description = "创建时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date createTime; + + @Schema(description = "更新者") + private Long updater; + + @Schema(description = "更新时间") + @JsonFormat(pattern = DateUtils.DATE_TIME_PATTERN) + private Date updateTime; +} diff --git a/srt-data-development/src/main/resources/mapper/DataProductionScheduleDao.xml b/srt-data-development/src/main/resources/mapper/DataProductionScheduleDao.xml new file mode 100644 index 0000000..5cfd98f --- /dev/null +++ b/srt-data-development/src/main/resources/mapper/DataProductionScheduleDao.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + UPDATE data_production_schedule SET status=#{status},release_time=#{releaseTime},release_user_id=#{releaseUserId} + WHERE id=#{id} + + +