数据采集5.0
parent
308e870dff
commit
d513cbac31
|
@ -25,10 +25,10 @@ public class BusinessInitializer implements ApplicationRunner {
|
||||||
* init task monitor
|
* init task monitor
|
||||||
*/
|
*/
|
||||||
private void initScheduleMonitor() {
|
private void initScheduleMonitor() {
|
||||||
// //处理没执行完的采集任务
|
//处理没执行完的采集任务
|
||||||
// metadataCollectRecordService.dealNotFinished();
|
metadataCollectRecordService.dealNotFinished();
|
||||||
// //处理没执行完的质量检测任务
|
//处理没执行完的质量检测任务
|
||||||
// qualityTaskService.dealNotFinished();
|
qualityTaskService.dealNotFinished();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,4 +13,6 @@ public interface MetadataCollectRecordService extends BaseService<MetadataCollec
|
||||||
PageResult<MetadataCollectRecordVO> page(MetadataCollectRecordQuery query);
|
PageResult<MetadataCollectRecordVO> page(MetadataCollectRecordQuery query);
|
||||||
|
|
||||||
void delete(List<Long> idList);
|
void delete(List<Long> idList);
|
||||||
|
|
||||||
|
void dealNotFinished();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,4 +18,6 @@ public interface QualityTaskService extends BaseService<QualityTaskEntity> {
|
||||||
PageResult<QualityTaskVo> pagea(QualityTaskQuery query);
|
PageResult<QualityTaskVo> pagea(QualityTaskQuery query);
|
||||||
|
|
||||||
void delete(List<Long> idList);
|
void delete(List<Long> idList);
|
||||||
|
|
||||||
|
void dealNotFinished();
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,10 +4,12 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
|
import net.srt.api.module.data.governance.constant.MetadataCollectRunStatus;
|
||||||
import net.srt.convert.MetadataCollectRecordConvert;
|
import net.srt.convert.MetadataCollectRecordConvert;
|
||||||
import net.srt.dao.MetadataCollectRecordDao;
|
import net.srt.dao.MetadataCollectRecordDao;
|
||||||
import net.srt.entity.MetadataCollectRecordEntity;
|
import net.srt.entity.MetadataCollectRecordEntity;
|
||||||
import net.srt.framework.common.page.PageResult;
|
import net.srt.framework.common.page.PageResult;
|
||||||
|
import net.srt.framework.common.utils.DateUtils;
|
||||||
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
|
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
|
||||||
import net.srt.query.MetadataCollectRecordQuery;
|
import net.srt.query.MetadataCollectRecordQuery;
|
||||||
import net.srt.service.MetadataCollectRecordService;
|
import net.srt.service.MetadataCollectRecordService;
|
||||||
|
@ -15,6 +17,7 @@ import net.srt.vo.MetadataCollectRecordVO;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
|
@ -32,6 +35,21 @@ public class MetadataCollectRecordServiceImpl extends BaseServiceImpl<MetadataCo
|
||||||
removeByIds(idList);
|
removeByIds(idList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dealNotFinished() {
|
||||||
|
LambdaQueryWrapper<MetadataCollectRecordEntity> wrapper = Wrappers.lambdaQuery();
|
||||||
|
wrapper.in(MetadataCollectRecordEntity::getStatus, MetadataCollectRunStatus.RUNNING.getCode());
|
||||||
|
List<MetadataCollectRecordEntity> collectRecordEntities = baseMapper.selectList(wrapper);
|
||||||
|
for (MetadataCollectRecordEntity collectRecordEntity : collectRecordEntities) {
|
||||||
|
collectRecordEntity.setEndTime(new Date());
|
||||||
|
collectRecordEntity.setStatus(MetadataCollectRunStatus.FAILED.getCode());
|
||||||
|
String errorLog = DateUtils.formatDateTime(new Date()) + " The collect task has unexpected stop,you can try run again";
|
||||||
|
collectRecordEntity.setErrorLog(errorLog);
|
||||||
|
collectRecordEntity.setRealTimeLog(collectRecordEntity.getRealTimeLog() == null ? errorLog : collectRecordEntity.getRealTimeLog() + errorLog);
|
||||||
|
baseMapper.updateById(collectRecordEntity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private LambdaQueryWrapper<MetadataCollectRecordEntity> getWrapper(MetadataCollectRecordQuery query) {
|
private LambdaQueryWrapper<MetadataCollectRecordEntity> getWrapper(MetadataCollectRecordQuery query) {
|
||||||
LambdaQueryWrapper<MetadataCollectRecordEntity> wrapper = Wrappers.lambdaQuery();
|
LambdaQueryWrapper<MetadataCollectRecordEntity> wrapper = Wrappers.lambdaQuery();
|
||||||
wrapper.eq(query.getMetadataCollectId() != null,MetadataCollectRecordEntity::getMetadataCollectId,query.getMetadataCollectId())
|
wrapper.eq(query.getMetadataCollectId() != null,MetadataCollectRecordEntity::getMetadataCollectId,query.getMetadataCollectId())
|
||||||
|
|
|
@ -6,12 +6,15 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
|
|
||||||
|
import net.srt.api.module.data.governance.constant.MetadataCollectRunStatus;
|
||||||
|
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
|
||||||
import net.srt.convert.QualityRuleConvert;
|
import net.srt.convert.QualityRuleConvert;
|
||||||
import net.srt.convert.QualityTaskConvert;
|
import net.srt.convert.QualityTaskConvert;
|
||||||
import net.srt.dao.QualityTaskDao;
|
import net.srt.dao.QualityTaskDao;
|
||||||
import net.srt.entity.QualityQueryEntity;
|
import net.srt.entity.QualityQueryEntity;
|
||||||
import net.srt.entity.QualityTaskEntity;
|
import net.srt.entity.QualityTaskEntity;
|
||||||
import net.srt.framework.common.page.PageResult;
|
import net.srt.framework.common.page.PageResult;
|
||||||
|
import net.srt.framework.common.utils.DateUtils;
|
||||||
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
|
import net.srt.framework.mybatis.service.impl.BaseServiceImpl;
|
||||||
import net.srt.query.QualityTaskQuery;
|
import net.srt.query.QualityTaskQuery;
|
||||||
import net.srt.service.QualityTaskService;
|
import net.srt.service.QualityTaskService;
|
||||||
|
@ -20,6 +23,7 @@ import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import srt.cloud.framework.dbswitch.common.util.StringUtil;
|
import srt.cloud.framework.dbswitch.common.util.StringUtil;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,6 +48,20 @@ public class QualityTaskServiceimpl extends BaseServiceImpl<QualityTaskDao, Qual
|
||||||
removeByIds(idList);
|
removeByIds(idList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dealNotFinished() {
|
||||||
|
LambdaQueryWrapper<QualityTaskEntity> wrapper = Wrappers.lambdaQuery();
|
||||||
|
wrapper.in(QualityTaskEntity::getStatus, CommonRunStatus.WAITING.getCode(), MetadataCollectRunStatus.RUNNING.getCode());
|
||||||
|
List<QualityTaskEntity> qualityTaskEntities = baseMapper.selectList(wrapper);
|
||||||
|
for (QualityTaskEntity qualityTaskEntity : qualityTaskEntities) {
|
||||||
|
qualityTaskEntity.setEndTime(new Date());
|
||||||
|
qualityTaskEntity.setStatus(CommonRunStatus.FAILED.getCode());
|
||||||
|
String errorLog = DateUtils.formatDateTime(new Date()) + " The collect task has unexpected stop,you can try run again";
|
||||||
|
qualityTaskEntity.setErrorLog(errorLog);
|
||||||
|
baseMapper.updateById(qualityTaskEntity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private LambdaQueryWrapper<QualityTaskEntity> getWrapper(QualityTaskQuery query) {
|
private LambdaQueryWrapper<QualityTaskEntity> getWrapper(QualityTaskQuery query) {
|
||||||
LambdaQueryWrapper<QualityTaskEntity> wrapper= Wrappers.lambdaQuery();
|
LambdaQueryWrapper<QualityTaskEntity> wrapper= Wrappers.lambdaQuery();
|
||||||
wrapper.like(StringUtil.isNotBlank(query.getName()),QualityTaskEntity::getName,query.getName())
|
wrapper.like(StringUtil.isNotBlank(query.getName()),QualityTaskEntity::getName,query.getName())
|
||||||
|
|
Loading…
Reference in New Issue