refactor(): 重构同步代码

dev
chao 2024-04-26 08:38:41 +08:00
parent 0e13e58058
commit 48346981dc
1 changed files with 265 additions and 319 deletions

View File

@ -191,166 +191,13 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
// 保存数据源信息
assetStructureService.save(entity);
log.info("数据源信息已同步");
String jdbcUrl = "";
String driveClass = "";
DataType dataType = dataTypeService.selectDataTypeById(dataSource.getTypeId());
// 同步数据结构
this.jdbcSync(entity, dataSource, dataType);
try {
List<AssetStructureTable> assetStructureTableList = new ArrayList<>();
List<AssetTableDetails> assetTableDetails = new ArrayList<>();
if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) {
if ("mysql".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + "/" + dataSource.getDataSourceDatabaseName() + "?" + dataSource.getAdditionalConfiguration();
String tableSQL = "select * from information_schema.tables where TABLE_SCHEMA = " + "'" + dataSource.getDataSourceDatabaseName() + "'";
// 加载数据库驱动
Class.forName(driveClass);
// 连接数据库
Connection conn = DriverManager.getConnection(jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword());
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(tableSQL);
log.info("正在同步当前库所有的表");
while (rs.next()) {
// 获取表名
String tableName = rs.getString("TABLE_NAME");
// 表注释
String tableNameAnnotation = rs.getString("TABLE_COMMENT");
// 添加数据
// 表数据数量
assetStructureTableList.add(
AssetStructureTable.builder()
.assetStructureId(entity.getId())
.tableName(tableName)
.tableNameAnnotation(tableNameAnnotation)
.build()
);
}
log.info("正在同步当前所有表总体数据数量");
assetStructureTableList.stream().forEach(assetStructureTable -> {
String tableDataCountSQL = "select count(*) as countNum from " + assetStructureTable.getTableName();
try {
ResultSet rs2 = st.executeQuery(tableDataCountSQL);
while (rs2.next()) {
assetStructureTable.setTableDataCount(rs2.getLong("countNum"));
}
rs2.close();
} catch (SQLException e) {
}
});
// 批量插入
assetStructureTableService.saveBatch(assetStructureTableList);
log.info("同步所有当前库所有表完成");
assetStructureTableList.forEach(assetStructureTable -> {
try {
DatabaseMetaData metaData = conn.getMetaData();
// 查询指定表的所有列信息
ResultSet rs3 = metaData.getColumns(null, null, assetStructureTable.getTableName(), null);
while (rs3.next()) {
// 字段名
String columnName = rs3.getString("COLUMN_NAME");
// 注释
String remarks = rs3.getString("REMARKS");
// 是否主键
boolean isPrimaryKey = false;
ResultSet rs4 = metaData.getPrimaryKeys(null, null, assetStructureTable.getTableName());
while (rs4.next()) {
String primaryKeyColumnName = rs4.getString("COLUMN_NAME");
if (columnName.equals(primaryKeyColumnName)) {
isPrimaryKey = true;
break;
}
}
rs4.close();
String dataDetailsSQL = "select * from " + assetStructureTable.getTableName() + " where 1=2";
// 字段类型
String typeName = rs3.getString("TYPE_NAME");
// 映射类型
String mappingType = null;
PreparedStatement st1 = conn.prepareStatement(dataDetailsSQL);
ResultSetMetaData rsmd = st1.executeQuery().getMetaData();
for (int i = 0; i < rsmd.getColumnCount(); i++) {
if (columnName.equals(rsmd.getColumnName(i + 1))) {
int lastDotIndex = rsmd.getColumnClassName(i + 1).lastIndexOf('.');
mappingType = rsmd.getColumnClassName(i + 1).substring(lastDotIndex + 1);
break;
}
}
// 字段长度
int columnSize = rs3.getInt("COLUMN_SIZE");
// 小数点
int decimalDigits = rs3.getInt("DECIMAL_DIGITS");
// 是否为空
boolean isNullable = (rs3.getInt("NULLABLE") == DatabaseMetaData.columnNullable);
// 默认值
String defaultValue = rs3.getString("COLUMN_DEF");
// 主键
boolean finalIsPrimaryKey = isPrimaryKey;
String finalMappingType = mappingType;
assetTableDetails.add(
AssetTableDetails.builder()
.assetStructureTableId(assetStructureTable.getId())
.name(columnName)
.annotation(remarks)
.primaryOrNot(finalIsPrimaryKey ? "Y" : "N")
.type(typeName)
.mappingType(finalMappingType)
.length((long) columnSize)
.decimalPoint((long) decimalDigits)
.nullOrNot(isNullable ? "Y" : "N")
.defaultValue(defaultValue)
.yesNoDictionary(null)
.build()
);
}
rs3.close();
} catch (Exception e) {
log.info("获取表字段信息失败");
System.out.println(e);
}
});
conn.close();
st.close();
rs.close();
assetTableDetailsService.saveBatch(assetTableDetails);
}
if ("oracle".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ":" + dataSource.getDataSourceDatabaseName();
}
if ("sqlserver".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ";databaseName=" + dataSource.getDataSourceDatabaseName();
}
} else {
// redis
//连接指定的redis
Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort()));
//如果有密码则需要下面这一行
if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) {
jedis.auth(dataSource.getDataSourcePassword());
}
//查看服务是否运行,运行正常的话返回一个PONG否则返回一个连接错误
try {
jedis.ping();
} catch (Exception e) {
}
}
} catch (Exception e) {
return false;
Map<String, Object> jdbcSyncMap = this.jdbcSync(entity, dataSource, dataType);
if (jdbcSyncMap != null) {
List<AssetStructureTable> assetStructureTableList = (List<AssetStructureTable>) jdbcSyncMap.get("assetStructureTableList");
Connection connection = (Connection) jdbcSyncMap.get("Connection");
assetTableDetailsSync(assetStructureTableList, connection);
return true;
}
} else { // 如果他存在就给他修改
log.info("当前数据源已被修改,正在重新同步...");
@ -373,170 +220,23 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
.eq(AssetStructureTable::getAssetStructureId, entity.getId())
);
log.info("已删除原数据,正在准备同步新数据");
String jdbcUrl = "";
String driveClass = "";
DataType dataType = dataTypeService.selectDataTypeById(dataSource.getTypeId());
try {
List<AssetStructureTable> assetStructureTableList = new ArrayList<>();
List<AssetTableDetails> assetTableDetails = new ArrayList<>();
if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) {
if ("mysql".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + "/" + dataSource.getDataSourceDatabaseName() + "?" + dataSource.getAdditionalConfiguration();
String tableSQL = "select * from information_schema.tables where TABLE_SCHEMA = " + "'" + dataSource.getDataSourceDatabaseName() + "'";
// 加载数据库驱动
Class.forName(driveClass);
// 连接数据库
Connection conn = DriverManager.getConnection(jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword());
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(tableSQL);
while (rs.next()) {
// 获取表名
String tableName = rs.getString("TABLE_NAME");
// 表注释
String tableNameAnnotation = rs.getString("TABLE_COMMENT");
// 添加数据
// 表数据数量
assetStructureTableList.add(
AssetStructureTable.builder()
.assetStructureId(entity.getId())
.tableName(tableName)
.tableNameAnnotation(tableNameAnnotation)
.build()
);
}
assetStructureTableList.stream().forEach(assetStructureTable -> {
String tableDataCountSQL = "select count(*) as countNum from " + assetStructureTable.getTableName();
try {
ResultSet rs2 = st.executeQuery(tableDataCountSQL);
while (rs2.next()) {
assetStructureTable.setTableDataCount(rs2.getLong("countNum"));
}
rs2.close();
} catch (SQLException e) {
}
});
// 批量插入
assetStructureTableService.saveBatch(assetStructureTableList);
// 先删除旧数据
List<Long> collect = assetStructureTables.stream().map(
assetStructureTable -> assetStructureTable.getId()
).collect(Collectors.toList());
for (Long assetStructureTableId : collect) {
assetTableDetailsService.remove(new LambdaQueryWrapper<AssetTableDetails>().eq(AssetTableDetails::getAssetStructureTableId, assetStructureTableId));
}
assetStructureTableList.forEach(assetStructureTable -> {
try {
DatabaseMetaData metaData = conn.getMetaData();
// 查询指定表的所有列信息
ResultSet rs3 = metaData.getColumns(null, null, assetStructureTable.getTableName(), null);
while (rs3.next()) {
// 字段名
String columnName = rs3.getString("COLUMN_NAME");
// 注释
String remarks = rs3.getString("REMARKS");
// 是否主键
boolean isPrimaryKey = false;
ResultSet rs4 = metaData.getPrimaryKeys(null, null, assetStructureTable.getTableName());
while (rs4.next()) {
String primaryKeyColumnName = rs4.getString("COLUMN_NAME");
if (columnName.equals(primaryKeyColumnName)) {
isPrimaryKey = true;
break;
}
}
rs4.close();
String dataDetailsSQL = "select * from " + assetStructureTable.getTableName() + " where 1=2";
// 字段类型
String typeName = rs3.getString("TYPE_NAME");
// 映射类型
String mappingType = null;
PreparedStatement st1 = conn.prepareStatement(dataDetailsSQL);
ResultSetMetaData rsmd = st1.executeQuery().getMetaData();
for (int i = 0; i < rsmd.getColumnCount(); i++) {
if (columnName.equals(rsmd.getColumnName(i + 1))) {
int lastDotIndex = rsmd.getColumnClassName(i + 1).lastIndexOf('.');
mappingType = rsmd.getColumnClassName(i + 1).substring(lastDotIndex + 1);
break;
}
}
// 字段长度
int columnSize = rs3.getInt("COLUMN_SIZE");
// 小数点
int decimalDigits = rs3.getInt("DECIMAL_DIGITS");
// 是否为空
boolean isNullable = (rs3.getInt("NULLABLE") == DatabaseMetaData.columnNullable);
// 默认值
String defaultValue = rs3.getString("COLUMN_DEF");
// 主键
boolean finalIsPrimaryKey = isPrimaryKey;
String finalMappingType = mappingType;
assetTableDetails.add(
AssetTableDetails.builder()
.assetStructureTableId(assetStructureTable.getId())
.name(columnName)
.annotation(remarks)
.primaryOrNot(finalIsPrimaryKey ? "Y" : "N")
.type(typeName)
.mappingType(finalMappingType)
.length((long) columnSize)
.decimalPoint((long) decimalDigits)
.nullOrNot(isNullable ? "Y" : "N")
.defaultValue(defaultValue)
.yesNoDictionary(null)
.build()
);
}
rs3.close();
} catch (Exception e) {
}
});
conn.close();
st.close();
rs.close();
assetTableDetailsService.saveBatch(assetTableDetails);
}
if ("oracle".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ":" + dataSource.getDataSourceDatabaseName();
}
if ("sqlserver".equals(dataType.getDataType())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ";databaseName=" + dataSource.getDataSourceDatabaseName();
}
} else {
// redis
//连接指定的redis
Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort()));
//如果有密码则需要下面这一行
if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) {
jedis.auth(dataSource.getDataSourcePassword());
}
//查看服务是否运行,运行正常的话返回一个PONG否则返回一个连接错误
try {
jedis.ping();
} catch (Exception e) {
}
// 获取数据
Map<String, Object> jdbcSyncMap = this.jdbcSync(entity, dataSource, dataType);
if (jdbcSyncMap != null) {
List<AssetStructureTable> assetStructureTableList = (List<AssetStructureTable>) jdbcSyncMap.get("assetStructureTableList");
Connection connection = (Connection) jdbcSyncMap.get("Connection");
List<Long> collect = assetStructureTables.stream().map(
assetStructureTable -> assetStructureTable.getId()
).collect(Collectors.toList());
for (Long assetStructureTableId : collect) {
assetTableDetailsService.remove(new LambdaQueryWrapper<AssetTableDetails>().eq(AssetTableDetails::getAssetStructureTableId, assetStructureTableId));
}
} catch (Exception e) {
return false;
assetTableDetailsSync(assetStructureTableList, connection);
}
}
} else {
log.info("连接失败,同步数据失败");
// 如果连接失败并存在就给他删除
if (assetStructure != null) {
// 删除
@ -581,12 +281,12 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
if ("oracle".equals(dataType.getDataType())) {
log.info("Oracle测试连接");
Map<String, String> mysqlEdConnection = this.mysqlConnection(dataSource, dataType);
Map<String, String> mysqlEdConnection = this.oracleConnection(dataSource, dataType);
return testDatasource(mysqlEdConnection.get("driveClass"), mysqlEdConnection.get("jdbcUrl"), dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword());
}
if ("sqlserver".equals(dataType.getDataType())) {
log.info("SQLServer测试连接");
Map<String, String> mysqlEdConnection = this.mysqlConnection(dataSource, dataType);
Map<String, String> mysqlEdConnection = this.sqlserverConnection(dataSource, dataType);
return testDatasource(mysqlEdConnection.get("driveClass"), mysqlEdConnection.get("jdbcUrl"), dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword());
}
return false;
@ -663,5 +363,251 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
}
/**
*
*
* @param assetStructure
* @param dataSource
* @param dataType
*/
public Map<String, Object> jdbcSync(AssetStructure assetStructure, DataSource dataSource, DataType dataType) {
if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) {
if ("mysql".equals(dataType.getDataType())) {
log.info("当前数据正在同步");
Map<String, String> stringStringMap = mysqlConnection(dataSource, dataType);
String driveClass = stringStringMap.get("driveClass");
String jdbcUrl = stringStringMap.get("jdbcUrl");
// 统计当前库所有表 初始数组
List<AssetStructureTable> assetStructureTableList = new ArrayList<>();
Connection conn = null;
Statement st = null;
ResultSet rs = null;
// 查询表
String tableSql = "select * from information_schema.tables where TABLE_SCHEMA = " + "'" + dataSource.getDataSourceDatabaseName() + "'";
try {
// 加载数据库驱动
Class.forName(driveClass);
// 连接数据库
conn = DriverManager.getConnection(jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword());
st = conn.createStatement();
rs = st.executeQuery(tableSql);
while (rs.next()) {
// 获取表名
String tableName = rs.getString("TABLE_NAME");
// 表注释
String tableNameAnnotation = rs.getString("TABLE_COMMENT");
// 添加数据
// 表数据数量
assetStructureTableList.add(
AssetStructureTable.builder()
.assetStructureId(assetStructure.getId())
.tableName(tableName)
.tableNameAnnotation(tableNameAnnotation)
.build()
);
}
log.info("正在同步当前所有表总体数据数量");
Statement finalSt = st;
assetStructureTableList.stream().forEach(assetStructureTable -> {
String tableDataCountSQL = "select count(*) as countNum from " + assetStructureTable.getTableName();
try {
ResultSet rs2 = finalSt.executeQuery(tableDataCountSQL);
while (rs2.next()) {
assetStructureTable.setTableDataCount(rs2.getLong("countNum"));
}
rs2.close();
} catch (SQLException e) {
log.info("mysql查询表数据总数失败");
}
});
// 批量插入
assetStructureTableService.saveBatch(assetStructureTableList);
log.info("同步所有当前库所有表完成");
Connection finalConn = conn;
return new HashMap<String, Object>() {{
put("assetStructureTableList", assetStructureTableList);
put("Connection", finalConn);
}};
} catch (ClassNotFoundException e) {
log.info("mysql驱动加载失败");
throw new RuntimeException(e);
} catch (SQLException e) {
log.info("mysql连接失败");
throw new RuntimeException(e);
} finally {
try {
if (st != null) {
st.close();
}
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
log.info("无法关闭 JDBC 资源");
}
}
}
if ("oracle".equals(dataType.getDataType())) {
log.info("oracle正在同步数据");
}
if ("sqlserver".equals(dataType.getDataType())) {
log.info("sqlserver正在同步数据");
}
} else {
// redis
//连接指定的redis
Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort()));
//如果有密码则需要下面这一行
if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) {
jedis.auth(dataSource.getDataSourcePassword());
}
//查看服务是否运行,运行正常的话返回一个PONG否则返回一个连接错误
try {
jedis.ping();
log.info("redis连接成功");
} catch (Exception e) {
log.info("redis连接失败");
}
}
return null;
}
/**
*
*
* @param assetStructureTableList
* @param conn
*/
public void assetTableDetailsSync(List<AssetStructureTable> assetStructureTableList, Connection conn) {
List<AssetTableDetails> assetTableDetails = new ArrayList<>();
try {
log.info("正在同步表详细信息");
assetStructureTableList.forEach(assetStructureTable -> {
try {
DatabaseMetaData metaData = conn.getMetaData();
ResultSet rs = getColumns(metaData, conn, assetStructureTable);
while (rs.next()) {
AssetTableDetails details = processRow(rs, metaData, conn, assetStructureTable);
assetTableDetails.add(details);
}
rs.close();
log.info("同步表详细信息完成");
} catch (Exception e) {
log.info("同步表的表详细信息时出错");
}
});
assetTableDetailsService.saveBatch(assetTableDetails);
} catch (Exception e) {
log.info("处理数据库连接时出错");
}
}
/**
*
*
* @param metaData
* @param conn
* @param assetStructureTable
* @return
* @throws SQLException
*/
private ResultSet getColumns(DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException {
return metaData.getColumns(null, null, assetStructureTable.getTableName(), null);
}
/**
*
*
* @param metaData
* @param conn
* @param assetStructureTable
* @return
* @throws SQLException
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
*/
private AssetTableDetails processRow(ResultSet rs, DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException {
String columnName = rs.getString("COLUMN_NAME");
String remarks = rs.getString("REMARKS");
boolean isPrimaryKey = isColumnPrimaryKey(metaData, assetStructureTable.getTableName(), columnName);
String dataDetailsSQL = "select * from " + assetStructureTable.getTableName() + " where 1=2";
String typeName = rs.getString("TYPE_NAME");
String mappingType = getMappingType(conn, dataDetailsSQL, columnName);
int columnSize = rs.getInt("COLUMN_SIZE");
int decimalDigits = rs.getInt("DECIMAL_DIGITS");
boolean isNullable = (rs.getInt("NULLABLE") == DatabaseMetaData.columnNullable);
String defaultValue = rs.getString("COLUMN_DEF");
boolean finalIsPrimaryKey = isPrimaryKey;
String finalMappingType = mappingType;
return AssetTableDetails.builder()
.assetStructureTableId(assetStructureTable.getId())
.name(columnName)
.annotation(remarks)
.primaryOrNot(finalIsPrimaryKey ? "Y" : "N")
.type(typeName)
.mappingType(finalMappingType)
.length((long) columnSize)
.decimalPoint((long) decimalDigits)
.nullOrNot(isNullable ? "Y" : "N")
.defaultValue(defaultValue)
.yesNoDictionary(null)
.build();
}
/**
*
*
* @param metaData
* @param tableName
* @param columnName
* @return
* @throws SQLException
*/
private boolean isColumnPrimaryKey(DatabaseMetaData metaData, String tableName, String columnName) throws SQLException {
try (ResultSet rs = metaData.getPrimaryKeys(null, null, tableName)) {
while (rs.next()) {
String primaryKeyColumnName = rs.getString("COLUMN_NAME");
if (columnName.equals(primaryKeyColumnName)) {
return true;
}
}
return false;
}
}
/**
*
*
* @param conn
* @param dataDetailsSQL
* @param columnName
* @return
* @throws SQLException
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
*/
private String getMappingType(Connection conn, String dataDetailsSQL, String columnName) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException {
try (
PreparedStatement st = conn.prepareStatement(dataDetailsSQL);
ResultSet rs = st.executeQuery()) {
ResultSetMetaData rsmd = rs.getMetaData();
for (int i = 0; i < rsmd.getColumnCount(); i++) {
if (columnName.equals(rsmd.getColumnName(i + 1))) {
int lastDotIndex = rsmd.getColumnClassName(i + 1).lastIndexOf('.');
return rsmd.getColumnClassName(i + 1).substring(lastDotIndex + 1);
}
}
}
return null;
}
}