From 48346981dc1b7157ad969c1bcf44db17f3ac8d3b Mon Sep 17 00:00:00 2001 From: chao <3072464591@QQ.com> Date: Fri, 26 Apr 2024 08:38:41 +0800 Subject: [PATCH] =?UTF-8?q?refactor():=20=E9=87=8D=E6=9E=84=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/DataSourceServiceImpl.java | 584 ++++++++---------- 1 file changed, 265 insertions(+), 319 deletions(-) diff --git a/etl-modules/etl-modules-data-source/etl-modules-data-source-system/src/main/java/com/etl/data/source/service/impl/DataSourceServiceImpl.java b/etl-modules/etl-modules-data-source/etl-modules-data-source-system/src/main/java/com/etl/data/source/service/impl/DataSourceServiceImpl.java index 41f2815..80f15b6 100644 --- a/etl-modules/etl-modules-data-source/etl-modules-data-source-system/src/main/java/com/etl/data/source/service/impl/DataSourceServiceImpl.java +++ b/etl-modules/etl-modules-data-source/etl-modules-data-source-system/src/main/java/com/etl/data/source/service/impl/DataSourceServiceImpl.java @@ -191,166 +191,13 @@ public class DataSourceServiceImpl extends ServiceImpl assetStructureTableList = new ArrayList<>(); - List assetTableDetails = new ArrayList<>(); - if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) { - if ("mysql".equals(dataType.getDataType())) { - driveClass = dataType.getDriverManager(); - jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + "/" + dataSource.getDataSourceDatabaseName() + "?" + dataSource.getAdditionalConfiguration(); - String tableSQL = "select * from information_schema.tables where TABLE_SCHEMA = " + "'" + dataSource.getDataSourceDatabaseName() + "'"; - // 加载数据库驱动 - Class.forName(driveClass); - // 连接数据库 - Connection conn = DriverManager.getConnection(jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword()); - Statement st = conn.createStatement(); - ResultSet rs = st.executeQuery(tableSQL); - log.info("正在同步当前库所有的表"); - while (rs.next()) { - // 获取表名 - String tableName = rs.getString("TABLE_NAME"); - // 表注释 - String tableNameAnnotation = rs.getString("TABLE_COMMENT"); - // 添加数据 - // 表数据数量 - assetStructureTableList.add( - AssetStructureTable.builder() - .assetStructureId(entity.getId()) - .tableName(tableName) - .tableNameAnnotation(tableNameAnnotation) - .build() - ); - } - log.info("正在同步当前所有表总体数据数量"); - assetStructureTableList.stream().forEach(assetStructureTable -> { - String tableDataCountSQL = "select count(*) as countNum from " + assetStructureTable.getTableName(); - try { - ResultSet rs2 = st.executeQuery(tableDataCountSQL); - while (rs2.next()) { - assetStructureTable.setTableDataCount(rs2.getLong("countNum")); - } - rs2.close(); - } catch (SQLException e) { - - } - }); - // 批量插入 - assetStructureTableService.saveBatch(assetStructureTableList); - log.info("同步所有当前库所有表完成"); - - - assetStructureTableList.forEach(assetStructureTable -> { - try { - DatabaseMetaData metaData = conn.getMetaData(); - // 查询指定表的所有列信息 - ResultSet rs3 = metaData.getColumns(null, null, assetStructureTable.getTableName(), null); - while (rs3.next()) { - // 字段名 - String columnName = rs3.getString("COLUMN_NAME"); - // 注释 - String remarks = rs3.getString("REMARKS"); - - // 是否主键 - boolean isPrimaryKey = false; - - ResultSet rs4 = metaData.getPrimaryKeys(null, null, assetStructureTable.getTableName()); - while (rs4.next()) { - String primaryKeyColumnName = rs4.getString("COLUMN_NAME"); - if (columnName.equals(primaryKeyColumnName)) { - isPrimaryKey = true; - break; - } - } - rs4.close(); - - String dataDetailsSQL = "select * from " + assetStructureTable.getTableName() + " where 1=2"; - // 字段类型 - String typeName = rs3.getString("TYPE_NAME"); - - // 映射类型 - String mappingType = null; - - PreparedStatement st1 = conn.prepareStatement(dataDetailsSQL); - ResultSetMetaData rsmd = st1.executeQuery().getMetaData(); - for (int i = 0; i < rsmd.getColumnCount(); i++) { - if (columnName.equals(rsmd.getColumnName(i + 1))) { - int lastDotIndex = rsmd.getColumnClassName(i + 1).lastIndexOf('.'); - mappingType = rsmd.getColumnClassName(i + 1).substring(lastDotIndex + 1); - break; - } - } - - // 字段长度 - int columnSize = rs3.getInt("COLUMN_SIZE"); - // 小数点 - int decimalDigits = rs3.getInt("DECIMAL_DIGITS"); - // 是否为空 - boolean isNullable = (rs3.getInt("NULLABLE") == DatabaseMetaData.columnNullable); - // 默认值 - String defaultValue = rs3.getString("COLUMN_DEF"); - - // 主键 - boolean finalIsPrimaryKey = isPrimaryKey; - String finalMappingType = mappingType; - assetTableDetails.add( - AssetTableDetails.builder() - .assetStructureTableId(assetStructureTable.getId()) - .name(columnName) - .annotation(remarks) - .primaryOrNot(finalIsPrimaryKey ? "Y" : "N") - .type(typeName) - .mappingType(finalMappingType) - .length((long) columnSize) - .decimalPoint((long) decimalDigits) - .nullOrNot(isNullable ? "Y" : "N") - .defaultValue(defaultValue) - .yesNoDictionary(null) - .build() - ); - } - rs3.close(); - } catch (Exception e) { - log.info("获取表字段信息失败"); - System.out.println(e); - } - }); - conn.close(); - st.close(); - rs.close(); - assetTableDetailsService.saveBatch(assetTableDetails); - } - if ("oracle".equals(dataType.getDataType())) { - driveClass = dataType.getDriverManager(); - jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ":" + dataSource.getDataSourceDatabaseName(); - } - if ("sqlserver".equals(dataType.getDataType())) { - driveClass = dataType.getDriverManager(); - jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ";databaseName=" + dataSource.getDataSourceDatabaseName(); - } - - } else { - // redis - //连接指定的redis - Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort())); - //如果有密码则需要下面这一行 - if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) { - jedis.auth(dataSource.getDataSourcePassword()); - } - //查看服务是否运行,运行正常的话返回一个PONG,否则返回一个连接错误 - try { - jedis.ping(); - - } catch (Exception e) { - - } - } - } catch (Exception e) { - return false; + Map jdbcSyncMap = this.jdbcSync(entity, dataSource, dataType); + if (jdbcSyncMap != null) { + List assetStructureTableList = (List) jdbcSyncMap.get("assetStructureTableList"); + Connection connection = (Connection) jdbcSyncMap.get("Connection"); + assetTableDetailsSync(assetStructureTableList, connection); + return true; } } else { // 如果他存在就给他修改 log.info("当前数据源已被修改,正在重新同步..."); @@ -373,170 +220,23 @@ public class DataSourceServiceImpl extends ServiceImpl assetStructureTableList = new ArrayList<>(); - List assetTableDetails = new ArrayList<>(); - if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) { - if ("mysql".equals(dataType.getDataType())) { - driveClass = dataType.getDriverManager(); - jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + "/" + dataSource.getDataSourceDatabaseName() + "?" + dataSource.getAdditionalConfiguration(); - String tableSQL = "select * from information_schema.tables where TABLE_SCHEMA = " + "'" + dataSource.getDataSourceDatabaseName() + "'"; - // 加载数据库驱动 - Class.forName(driveClass); - // 连接数据库 - Connection conn = DriverManager.getConnection(jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword()); - Statement st = conn.createStatement(); - ResultSet rs = st.executeQuery(tableSQL); - while (rs.next()) { - // 获取表名 - String tableName = rs.getString("TABLE_NAME"); - // 表注释 - String tableNameAnnotation = rs.getString("TABLE_COMMENT"); - // 添加数据 - // 表数据数量 - assetStructureTableList.add( - AssetStructureTable.builder() - .assetStructureId(entity.getId()) - .tableName(tableName) - .tableNameAnnotation(tableNameAnnotation) - .build() - ); - } - assetStructureTableList.stream().forEach(assetStructureTable -> { - String tableDataCountSQL = "select count(*) as countNum from " + assetStructureTable.getTableName(); - try { - ResultSet rs2 = st.executeQuery(tableDataCountSQL); - while (rs2.next()) { - assetStructureTable.setTableDataCount(rs2.getLong("countNum")); - } - rs2.close(); - } catch (SQLException e) { - - } - }); - // 批量插入 - assetStructureTableService.saveBatch(assetStructureTableList); - // 先删除旧数据 - List collect = assetStructureTables.stream().map( - assetStructureTable -> assetStructureTable.getId() - ).collect(Collectors.toList()); - for (Long assetStructureTableId : collect) { - assetTableDetailsService.remove(new LambdaQueryWrapper().eq(AssetTableDetails::getAssetStructureTableId, assetStructureTableId)); - } - assetStructureTableList.forEach(assetStructureTable -> { - try { - DatabaseMetaData metaData = conn.getMetaData(); - // 查询指定表的所有列信息 - ResultSet rs3 = metaData.getColumns(null, null, assetStructureTable.getTableName(), null); - while (rs3.next()) { - // 字段名 - String columnName = rs3.getString("COLUMN_NAME"); - // 注释 - String remarks = rs3.getString("REMARKS"); - - // 是否主键 - boolean isPrimaryKey = false; - - ResultSet rs4 = metaData.getPrimaryKeys(null, null, assetStructureTable.getTableName()); - while (rs4.next()) { - String primaryKeyColumnName = rs4.getString("COLUMN_NAME"); - if (columnName.equals(primaryKeyColumnName)) { - isPrimaryKey = true; - break; - } - } - rs4.close(); - - String dataDetailsSQL = "select * from " + assetStructureTable.getTableName() + " where 1=2"; - - // 字段类型 - String typeName = rs3.getString("TYPE_NAME"); - - // 映射类型 - String mappingType = null; - - PreparedStatement st1 = conn.prepareStatement(dataDetailsSQL); - ResultSetMetaData rsmd = st1.executeQuery().getMetaData(); - for (int i = 0; i < rsmd.getColumnCount(); i++) { - if (columnName.equals(rsmd.getColumnName(i + 1))) { - int lastDotIndex = rsmd.getColumnClassName(i + 1).lastIndexOf('.'); - mappingType = rsmd.getColumnClassName(i + 1).substring(lastDotIndex + 1); - break; - } - } - - // 字段长度 - int columnSize = rs3.getInt("COLUMN_SIZE"); - // 小数点 - int decimalDigits = rs3.getInt("DECIMAL_DIGITS"); - // 是否为空 - boolean isNullable = (rs3.getInt("NULLABLE") == DatabaseMetaData.columnNullable); - // 默认值 - String defaultValue = rs3.getString("COLUMN_DEF"); - - // 主键 - boolean finalIsPrimaryKey = isPrimaryKey; - String finalMappingType = mappingType; - assetTableDetails.add( - AssetTableDetails.builder() - .assetStructureTableId(assetStructureTable.getId()) - .name(columnName) - .annotation(remarks) - .primaryOrNot(finalIsPrimaryKey ? "Y" : "N") - .type(typeName) - .mappingType(finalMappingType) - .length((long) columnSize) - .decimalPoint((long) decimalDigits) - .nullOrNot(isNullable ? "Y" : "N") - .defaultValue(defaultValue) - .yesNoDictionary(null) - .build() - ); - } - rs3.close(); - } catch (Exception e) { - } - }); - conn.close(); - st.close(); - rs.close(); - assetTableDetailsService.saveBatch(assetTableDetails); - } - if ("oracle".equals(dataType.getDataType())) { - driveClass = dataType.getDriverManager(); - jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ":" + dataSource.getDataSourceDatabaseName(); - } - if ("sqlserver".equals(dataType.getDataType())) { - driveClass = dataType.getDriverManager(); - jdbcUrl = dataType.getJdbcPre() + dataSource.getDataSourceIp() + ":" + dataSource.getDataSourcePort() + ";databaseName=" + dataSource.getDataSourceDatabaseName(); - } - - } else { - // redis - //连接指定的redis - Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort())); - //如果有密码则需要下面这一行 - if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) { - jedis.auth(dataSource.getDataSourcePassword()); - } - //查看服务是否运行,运行正常的话返回一个PONG,否则返回一个连接错误 - try { - jedis.ping(); - - } catch (Exception e) { - - } + // 获取数据 + Map jdbcSyncMap = this.jdbcSync(entity, dataSource, dataType); + if (jdbcSyncMap != null) { + List assetStructureTableList = (List) jdbcSyncMap.get("assetStructureTableList"); + Connection connection = (Connection) jdbcSyncMap.get("Connection"); + List collect = assetStructureTables.stream().map( + assetStructureTable -> assetStructureTable.getId() + ).collect(Collectors.toList()); + for (Long assetStructureTableId : collect) { + assetTableDetailsService.remove(new LambdaQueryWrapper().eq(AssetTableDetails::getAssetStructureTableId, assetStructureTableId)); } - } catch (Exception e) { - return false; + assetTableDetailsSync(assetStructureTableList, connection); } } } else { log.info("连接失败,同步数据失败"); - // 如果连接失败并存在就给他删除 if (assetStructure != null) { // 删除 @@ -581,12 +281,12 @@ public class DataSourceServiceImpl extends ServiceImpl mysqlEdConnection = this.mysqlConnection(dataSource, dataType); + Map mysqlEdConnection = this.oracleConnection(dataSource, dataType); return testDatasource(mysqlEdConnection.get("driveClass"), mysqlEdConnection.get("jdbcUrl"), dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword()); } if ("sqlserver".equals(dataType.getDataType())) { log.info("SQLServer测试连接"); - Map mysqlEdConnection = this.mysqlConnection(dataSource, dataType); + Map mysqlEdConnection = this.sqlserverConnection(dataSource, dataType); return testDatasource(mysqlEdConnection.get("driveClass"), mysqlEdConnection.get("jdbcUrl"), dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword()); } return false; @@ -663,5 +363,251 @@ public class DataSourceServiceImpl extends ServiceImpl jdbcSync(AssetStructure assetStructure, DataSource dataSource, DataType dataType) { + if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) { + if ("mysql".equals(dataType.getDataType())) { + log.info("当前数据正在同步"); + Map stringStringMap = mysqlConnection(dataSource, dataType); + String driveClass = stringStringMap.get("driveClass"); + String jdbcUrl = stringStringMap.get("jdbcUrl"); + + // 统计当前库所有表 初始数组 + List assetStructureTableList = new ArrayList<>(); + Connection conn = null; + Statement st = null; + ResultSet rs = null; + // 查询表 + String tableSql = "select * from information_schema.tables where TABLE_SCHEMA = " + "'" + dataSource.getDataSourceDatabaseName() + "'"; + try { + // 加载数据库驱动 + Class.forName(driveClass); + // 连接数据库 + conn = DriverManager.getConnection(jdbcUrl, dataSource.getDataSourceUsername(), dataSource.getDataSourcePassword()); + st = conn.createStatement(); + rs = st.executeQuery(tableSql); + while (rs.next()) { + // 获取表名 + String tableName = rs.getString("TABLE_NAME"); + // 表注释 + String tableNameAnnotation = rs.getString("TABLE_COMMENT"); + // 添加数据 + // 表数据数量 + assetStructureTableList.add( + AssetStructureTable.builder() + .assetStructureId(assetStructure.getId()) + .tableName(tableName) + .tableNameAnnotation(tableNameAnnotation) + .build() + ); + } + log.info("正在同步当前所有表总体数据数量"); + Statement finalSt = st; + assetStructureTableList.stream().forEach(assetStructureTable -> { + String tableDataCountSQL = "select count(*) as countNum from " + assetStructureTable.getTableName(); + try { + ResultSet rs2 = finalSt.executeQuery(tableDataCountSQL); + while (rs2.next()) { + assetStructureTable.setTableDataCount(rs2.getLong("countNum")); + } + rs2.close(); + } catch (SQLException e) { + log.info("mysql查询表数据总数失败"); + } + }); + // 批量插入 + assetStructureTableService.saveBatch(assetStructureTableList); + log.info("同步所有当前库所有表完成"); + Connection finalConn = conn; + return new HashMap() {{ + put("assetStructureTableList", assetStructureTableList); + put("Connection", finalConn); + }}; + } catch (ClassNotFoundException e) { + log.info("mysql驱动加载失败"); + throw new RuntimeException(e); + } catch (SQLException e) { + log.info("mysql连接失败"); + throw new RuntimeException(e); + } finally { + try { + if (st != null) { + st.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException e) { + log.info("无法关闭 JDBC 资源"); + } + + } + } + if ("oracle".equals(dataType.getDataType())) { + log.info("oracle正在同步数据"); + } + if ("sqlserver".equals(dataType.getDataType())) { + log.info("sqlserver正在同步数据"); + } + } else { + // redis + //连接指定的redis + Jedis jedis = new Jedis(dataSource.getDataSourceIp(), Integer.valueOf(dataSource.getDataSourcePort())); + //如果有密码则需要下面这一行 + if (dataSource.getDataSourcePassword() != null && !dataSource.getDataSourcePassword().equals("")) { + jedis.auth(dataSource.getDataSourcePassword()); + } + //查看服务是否运行,运行正常的话返回一个PONG,否则返回一个连接错误 + try { + jedis.ping(); + log.info("redis连接成功"); + } catch (Exception e) { + log.info("redis连接失败"); + } + } + return null; + } + + + /** + * 同步数据表结构详情 + * + * @param assetStructureTableList 数据表列表 + * @param conn 连接 + */ + public void assetTableDetailsSync(List assetStructureTableList, Connection conn) { + List assetTableDetails = new ArrayList<>(); + try { + log.info("正在同步表详细信息"); + assetStructureTableList.forEach(assetStructureTable -> { + try { + DatabaseMetaData metaData = conn.getMetaData(); + ResultSet rs = getColumns(metaData, conn, assetStructureTable); + while (rs.next()) { + AssetTableDetails details = processRow(rs, metaData, conn, assetStructureTable); + assetTableDetails.add(details); + } + rs.close(); + log.info("同步表详细信息完成"); + } catch (Exception e) { + log.info("同步表的表详细信息时出错"); + } + }); + assetTableDetailsService.saveBatch(assetTableDetails); + } catch (Exception e) { + log.info("处理数据库连接时出错"); + } + } + + /** + * 获取表详细信息 + * + * @param metaData + * @param conn + * @param assetStructureTable + * @return + * @throws SQLException + */ + private ResultSet getColumns(DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException { + return metaData.getColumns(null, null, assetStructureTable.getTableName(), null); + } + + /** + * 获取表详细信息 + * + * @param metaData + * @param conn + * @param assetStructureTable + * @return + * @throws SQLException + * @throws ClassNotFoundException + * @throws IllegalAccessException + * @throws InstantiationException + */ + private AssetTableDetails processRow(ResultSet rs, DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException { + String columnName = rs.getString("COLUMN_NAME"); + String remarks = rs.getString("REMARKS"); + boolean isPrimaryKey = isColumnPrimaryKey(metaData, assetStructureTable.getTableName(), columnName); + + String dataDetailsSQL = "select * from " + assetStructureTable.getTableName() + " where 1=2"; + String typeName = rs.getString("TYPE_NAME"); + String mappingType = getMappingType(conn, dataDetailsSQL, columnName); + + int columnSize = rs.getInt("COLUMN_SIZE"); + int decimalDigits = rs.getInt("DECIMAL_DIGITS"); + boolean isNullable = (rs.getInt("NULLABLE") == DatabaseMetaData.columnNullable); + String defaultValue = rs.getString("COLUMN_DEF"); + + boolean finalIsPrimaryKey = isPrimaryKey; + String finalMappingType = mappingType; + return AssetTableDetails.builder() + .assetStructureTableId(assetStructureTable.getId()) + .name(columnName) + .annotation(remarks) + .primaryOrNot(finalIsPrimaryKey ? "Y" : "N") + .type(typeName) + .mappingType(finalMappingType) + .length((long) columnSize) + .decimalPoint((long) decimalDigits) + .nullOrNot(isNullable ? "Y" : "N") + .defaultValue(defaultValue) + .yesNoDictionary(null) + .build(); + } + + /** + * 是否为主键 + * + * @param metaData + * @param tableName + * @param columnName + * @return + * @throws SQLException + */ + private boolean isColumnPrimaryKey(DatabaseMetaData metaData, String tableName, String columnName) throws SQLException { + try (ResultSet rs = metaData.getPrimaryKeys(null, null, tableName)) { + while (rs.next()) { + String primaryKeyColumnName = rs.getString("COLUMN_NAME"); + if (columnName.equals(primaryKeyColumnName)) { + return true; + } + } + return false; + } + } + + /** + * 获取映射类型 + * + * @param conn + * @param dataDetailsSQL + * @param columnName + * @return + * @throws SQLException + * @throws ClassNotFoundException + * @throws IllegalAccessException + * @throws InstantiationException + */ + private String getMappingType(Connection conn, String dataDetailsSQL, String columnName) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException { + try ( + PreparedStatement st = conn.prepareStatement(dataDetailsSQL); + ResultSet rs = st.executeQuery()) { + ResultSetMetaData rsmd = rs.getMetaData(); + for (int i = 0; i < rsmd.getColumnCount(); i++) { + if (columnName.equals(rsmd.getColumnName(i + 1))) { + int lastDotIndex = rsmd.getColumnClassName(i + 1).lastIndexOf('.'); + return rsmd.getColumnClassName(i + 1).substring(lastDotIndex + 1); + } + } + } + return null; + } + }