feat(): 数据同步

dev
chao 2024-04-22 20:01:18 +08:00
parent 41182d0a5d
commit 9953c7c202
9 changed files with 333 additions and 38 deletions

View File

@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.etl.common.core.annotation.Excel;
import com.etl.common.core.web.domain.BaseEntity;
import com.etl.data.source.domain.DataSource;
import com.etl.data.type.domain.DataType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -121,4 +123,25 @@ public class DataSourceResp extends BaseEntity {
private Long maximumWaitingTimes;
public static DataSourceResp dataSourceBuilder(DataSource source, DataType dataType) {
return DataSourceResp.builder()
.id(source.getId())
.dataSourceName(source.getDataSourceName())
.dataSourceSystemName(source.getDataSourceSystemName())
.typeId(source.getTypeId())
.dataType(dataType.getDataType())
.dataSourceIp(source.getDataSourceIp())
.dataSourcePort(source.getDataSourcePort())
.dataSourceDatabaseName(source.getDataSourceDatabaseName())
.dataSourceUsername(source.getDataSourceUsername())
.dataSourcePassword(source.getDataSourcePassword())
.additionalConfiguration(source.getAdditionalConfiguration())
.status(source.getStatus())
.initialNumberOfConnections(source.getInitialNumberOfConnections())
.maximumNumberOfConnections(source.getMaximumNumberOfConnections())
.maximumWaitingTime(source.getMaximumWaitingTime())
.maximumWaitingTimes(source.getMaximumWaitingTimes())
.remark(source.getRemark())
.build();
}
}

View File

@ -0,0 +1,63 @@
package com.etl.data.structure.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.etl.common.core.web.domain.BaseEntity;
import com.etl.data.source.domain.DataSource;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* asset_structure
*
* @author Chao
* @date 2024-04-22
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@TableName("asset_structure")
public class AssetStructure extends BaseEntity {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* id
*/
private Long dataSourceSystemId;
/**
*
*/
private String dataSourceName;
/**
*
*/
private String dataSourceSystemName;
/**
*
*/
private String dataSourceDatabaseName;
public static AssetStructure dataSourceBuilder(Long id, DataSource dataSource) {
return AssetStructure.builder()
.dataSourceSystemId(id)
.dataSourceName(dataSource.getDataSourceName())
.dataSourceSystemName(dataSource.getDataSourceSystemName())
.dataSourceDatabaseName(dataSource.getDataSourceDatabaseName())
.build();
}
}

View File

@ -0,0 +1,55 @@
package com.etl.data.structure.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.etl.common.core.annotation.Excel;
import com.etl.common.core.web.domain.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* asset_structure_table
*
* @author Chao
* @date 2024-04-22
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@TableName("asset_structure_table")
public class AssetStructureTable extends BaseEntity {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* id
*/
@Excel(name = "数据资产id")
private Long assetStructureId;
/**
*
*/
@Excel(name = "数据资产表")
private String tableName;
/**
*
*/
@Excel(name = "表数据总数")
private Long tableDataCount;
@Excel(name = "表注释")
private String tableNameAnnotation;
}

View File

@ -0,0 +1,29 @@
package com.etl.data.structure.domain.resp;
import com.etl.data.structure.domain.AssetStructure;
import com.etl.data.structure.domain.AssetStructureTable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
/**
*
*
* @author Chao
* @ClassName: AssetResp
* @CreateTime: 2024/4/22 7:44
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class AssetResp {
private AssetStructure assetStructure;
private List<AssetStructureTable> assetStructureTableList;
}

View File

@ -101,4 +101,15 @@ public class DataSourceController extends BaseController {
return Result.success(dataSourceService.testConnection(id));
}
/**
*
* @param id id
* @return
*/
@RequiresPermissions("data:source:assetSynchronization")
@PostMapping(value = "assetSynchronization/{id}")
public Result assetSynchronization(@PathVariable("id") Long id) {
return Result.success(dataSourceService.assetSynchronization(id));
}
}

View File

@ -66,4 +66,11 @@ public interface IDataSourceService extends IService<DataSource> {
* @param id id
*/
boolean testConnection(Long id);
/**
*
* @param id id
* @return
*/
boolean assetSynchronization(Long id);
}

View File

@ -8,13 +8,18 @@ import com.etl.data.source.domain.DataSource;
import com.etl.data.source.domain.resp.DataSourceResp;
import com.etl.data.source.mapper.DataSourceMapper;
import com.etl.data.source.service.IDataSourceService;
import com.etl.data.structure.domain.AssetStructure;
import com.etl.data.structure.domain.AssetStructureTable;
import com.etl.data.structure.service.IAssetStructureService;
import com.etl.data.structure.service.IAssetStructureTableService;
import com.etl.data.type.domain.DataType;
import com.etl.data.type.service.IDataTypeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import redis.clients.jedis.Jedis;
import java.sql.DriverManager;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@ -32,6 +37,12 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
@Autowired
private IDataTypeService dataTypeService;
@Autowired
private IAssetStructureService assetStructureService;
@Autowired
private IAssetStructureTableService assetStructureTableService;
/**
*
*
@ -54,36 +65,12 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
List<DataSource> dataSources = dataSourceMapper.selectDataSourceList(dataSource);
List<DataType> dataTypeList = dataTypeService.list();
List<DataSourceResp> dataSourceResps = new ArrayList<>();
dataSources.stream().forEach(
source -> {
dataTypeList.stream().forEach(
dataType -> {
if (dataType.getId().equals(source.getTypeId())) {
DataSourceResp build = DataSourceResp.builder()
.id(source.getId())
.dataSourceName(source.getDataSourceName())
.dataSourceSystemName(source.getDataSourceSystemName())
.typeId(source.getTypeId())
.dataType(dataType.getDataType())
.dataSourceIp(source.getDataSourceIp())
.dataSourcePort(source.getDataSourcePort())
.dataSourceDatabaseName(source.getDataSourceDatabaseName())
.dataSourceUsername(source.getDataSourceUsername())
.dataSourcePassword(source.getDataSourcePassword())
.additionalConfiguration(source.getAdditionalConfiguration())
.status(source.getStatus())
.initialNumberOfConnections(source.getInitialNumberOfConnections())
.maximumNumberOfConnections(source.getMaximumNumberOfConnections())
.maximumWaitingTime(source.getMaximumWaitingTime())
.maximumWaitingTimes(source.getMaximumWaitingTimes())
.remark(source.getRemark())
.build();
dataSourceResps.add(build);
}
}
);
}
);
dataSources.stream()
.flatMap(source ->
dataTypeList.stream()
.filter(dataType -> dataType.getId().equals(source.getTypeId()))
.map(dataType -> DataSourceResp.dataSourceBuilder(source, dataType))
).forEach(dataSourceResps::add);
return dataSourceResps;
}
@ -152,26 +139,26 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
.eq(DataType::getId, dataSource.getTypeId())
);
try {
if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null){
if ("mysql".equals(dataType.getDataType())){
if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) {
if ("mysql".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + "/" + dataSource.getDataSourceDatabaseName() + "?" + dataSource.getAdditionalConfiguration();
}
if ("oracle".equals(dataType.getDataType())){
if ("oracle".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ":" + dataSource.getDataSourceDatabaseName();
}
if ("sqlserver".equals(dataType.getDataType())){
if ("sqlserver".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre()+ dataSource.getDataSourceIp() +":"+dataSource.getDataSourcePort()+";databaseName="+dataSource.getDataSourceDatabaseName();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ";databaseName=" + dataSource.getDataSourceDatabaseName();
}
flag = testDatasource(driveClass, jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword());
}else {
} else {
// redis
//连接指定的redis
Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort()));
//如果有密码则需要下面这一行
if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")){
if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) {
jedis.auth(dataSource.getDataSourcePassword());
}
//查看服务是否运行,运行正常的话返回一个PONG否则返回一个连接错误
@ -201,6 +188,123 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
}
/**
*
*
* @param id id
* @return
*/
@Override
@Transactional
public boolean assetSynchronization(Long id) {
DataSource dataSource = this.getOne(
new LambdaQueryWrapper<DataSource>()
.eq(DataSource::getId, id)
);
// 查询是否存在当前
AssetStructure assetStructure = assetStructureService.getOne(
new LambdaQueryWrapper<AssetStructure>()
.eq(AssetStructure::getDataSourceSystemId, id)
);
boolean b = this.testConnection(id);
if (b) {
// 如果不存在就给他添加资产结构表
if (assetStructure == null) {
AssetStructure entity = AssetStructure.dataSourceBuilder(id, dataSource);
assetStructureService.save(entity);
String jdbcUrl = "";
String driveClass = "";
DataType dataType = dataTypeService.getOne(
new LambdaQueryWrapper<DataType>()
.eq(DataType::getId, dataSource.getTypeId())
);
try {
List<AssetStructureTable> assetStructureTableList = new ArrayList<>();
if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) {
if ("mysql".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + "/" + dataSource.getDataSourceDatabaseName() + "?" + dataSource.getAdditionalConfiguration();
// 加载数据库驱动
Class.forName(driveClass);
// 连接数据库
Connection conn = DriverManager.getConnection(jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword());
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery("select * from information_schema.tables where TABLE_SCHEMA = " + "'" + dataSource.getDataSourceDatabaseName() + "'");
while (rs.next()) {
// 获取表名
String tableName = rs.getString("TABLE_NAME");
// 表注释
String tableNameAnnotation = rs.getString("TABLE_COMMENT");
// 添加数据
// 表数据数量
assetStructureTableList.add(
new AssetStructureTable() {{
setAssetStructureId(entity.getId());
setTableName(tableName);
setTableNameAnnotation(tableNameAnnotation);
}}
);
}
assetStructureTableList.stream().forEach(assetStructureTable -> {
try {
ResultSet rs2 = st.executeQuery("select count(*) as countNum from " + assetStructureTable.getTableName());
while (rs2.next()){
assetStructureTable.setTableDataCount(rs2.getLong("countNum"));
}
rs2.close();
} catch (SQLException e) {
}
});
conn.close();
st.close();
rs.close();
// 批量插入
assetStructureTableService.saveBatch(assetStructureTableList);
}
if ("oracle".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ":" + dataSource.getDataSourceDatabaseName();
}
if ("sqlserver".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ";databaseName=" + dataSource.getDataSourceDatabaseName();
}
} else {
// redis
//连接指定的redis
Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort()));
//如果有密码则需要下面这一行
if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) {
jedis.auth(dataSource.getDataSourcePassword());
}
//查看服务是否运行,运行正常的话返回一个PONG否则返回一个连接错误
try {
jedis.ping();
} catch (Exception e) {
}
}
} catch (Exception e) {
System.out.println(e);
return false;
}
}
} else {
// 如果连接失败并存在就给他删除
if (assetStructure != null) {
// 删除
assetStructureService.removeById(assetStructure.getId());
}
return false;
}
return true;
}
/**
*
*

View File

@ -27,6 +27,8 @@ spring:
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
main:
allow-circular-references: true
logging:
level:
com.etl.system.mapper: DEBUG

View File

@ -34,6 +34,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<select id="selectDataSourceList" parameterType="com.etl.data.source.domain.DataSource" resultMap="DataSourceResult">
<include refid="selectDataSourceVo"/>
<where>
<if test="typeId != null and typeId != ''"> and type_id = #{typeId}</if>
<if test="dataSourceName != null and dataSourceName != ''"> and data_source_name like concat('%', #{dataSourceName}, '%')</if>
<if test="dataSourceSystemName != null and dataSourceSystemName != ''"> and data_source_system_name like concat('%', #{dataSourceSystemName}, '%')</if>
</where>