From 0689fdb194a4d5c9d1919fc83400d9f380a1adfc Mon Sep 17 00:00:00 2001 From: yaoxin <1752800946@qq.com> Date: Tue, 23 Apr 2024 20:35:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E4=BA=86=E4=B8=8D?= =?UTF-8?q?=E5=90=8C=E6=95=B0=E6=8D=AE=E5=BA=93=E7=9A=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=8E=A5=E5=85=A5=E6=96=B0=E5=A2=9E,=E4=BF=AE=E6=94=B9,?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E8=BF=9E=E6=8E=A5,=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将后台代码功能完善了一下,使其能添加MySql和PostGre两种数据库的数据接入,并且能够测试连接和同步数据 --- muyu-modules/muyu-etl/pom.xml | 5 + .../java/com/muyu/etl/domain/DataSource.java | 12 + .../service/impl/DataSourceServiceImpl.java | 266 +++++++++++++++--- .../mapper/system/DataSourceMapper.xml | 7 +- 4 files changed, 245 insertions(+), 45 deletions(-) diff --git a/muyu-modules/muyu-etl/pom.xml b/muyu-modules/muyu-etl/pom.xml index 4f978b4..0a144a2 100644 --- a/muyu-modules/muyu-etl/pom.xml +++ b/muyu-modules/muyu-etl/pom.xml @@ -18,6 +18,11 @@ + + + org.postgresql + postgresql + diff --git a/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/domain/DataSource.java b/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/domain/DataSource.java index a5ff8ae..cb92b97 100644 --- a/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/domain/DataSource.java +++ b/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/domain/DataSource.java @@ -94,6 +94,18 @@ public class DataSource extends BaseEntity @Excel(name = "数据来源名称") private String systemName; + /** 模式名称 */ + @Excel(name = "数据来源名称") + private String modeName; + + public String getModeName() { + return modeName; + } + + public void setModeName(String modeName) { + this.modeName = modeName; + } + public List getTableList() { return tableList; } diff --git a/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java b/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java index 5d30950..631c41b 100644 --- a/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java +++ b/muyu-modules/muyu-etl/src/main/java/com/muyu/etl/service/impl/DataSourceServiceImpl.java @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.muyu.etl.mapper.DataSourceMapper; import com.muyu.etl.service.IDataSourceService; +import org.springframework.transaction.annotation.Transactional; /** * 【请填写功能名称】Service业务层处理 @@ -158,25 +159,48 @@ public class DataSourceServiceImpl implements IDataSourceService @Override public Result testConnection(DataSource dataSource) { - String user = dataSource.getUsername(); - String password = dataSource.getPassword(); - String jdbcDriver = "com.mysql.cj.jdbc.Driver"; - String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); - if (dataSource.getConnectionParam()!=null && dataSource.getConnectionParam()!=""){ - jdbcUrl = jdbcUrl+"?"+dataSource.getConnectionParam(); - } - Connection conn = null; - try { - Class.forName(jdbcDriver); - } catch (ClassNotFoundException e) { - return Result.error("连接失败"); - } - try { - conn = DriverManager.getConnection(jdbcUrl, user, password); - conn.close(); - }catch (Exception e){ - return Result.error("连接失败"); + if (dataSource.getType().equals("MySql")){ + String user = dataSource.getUsername(); + String password = dataSource.getPassword(); + String jdbcDriver = "com.mysql.cj.jdbc.Driver"; + String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + if (dataSource.getConnectionParam()!=null && dataSource.getConnectionParam()!=""){ + jdbcUrl = jdbcUrl+"?"+dataSource.getConnectionParam(); + } + Connection conn = null; + try { + Class.forName(jdbcDriver); + } catch (ClassNotFoundException e) { + return Result.error("连接失败"); + } + try { + conn = DriverManager.getConnection(jdbcUrl, user, password); + conn.close(); + }catch (Exception e){ + return Result.error("连接失败"); + } + }else{ + String user = dataSource.getUsername(); + String password = dataSource.getPassword(); + String jdbcDriver = "org.postgresql.Driver"; + String jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + if (dataSource.getConnectionParam()!=null && dataSource.getConnectionParam()!=""){ + jdbcUrl = jdbcUrl+"?"+dataSource.getConnectionParam(); + } + Connection conn = null; + try { + Class.forName(jdbcDriver); + } catch (ClassNotFoundException e) { + return Result.error("连接失败"); + } + try { + conn = DriverManager.getConnection(jdbcUrl, user, password); + conn.close(); + }catch (Exception e){ + return Result.error("连接失败"); + } } + return Result.success("连接成功"); } @@ -233,13 +257,112 @@ public class DataSourceServiceImpl implements IDataSourceService return assetsModule; } + public Map getTypeMap(DataSource dataSource, String tableName){ + String user = dataSource.getUsername(); + String password = dataSource.getPassword(); + String jdbcDriver=""; + String jdbcUrl=""; + String sql=""; + if (dataSource.getType().equals("MySql")){ + jdbcDriver = "com.mysql.cj.jdbc.Driver"; + jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + sql = "select * from "+tableName; + }else{ + jdbcDriver = "org.postgresql.Driver"; + jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + sql = "select * from "+dataSource.getModeName()+"."+tableName; + } + Connection conn = null; + Map map = new HashMap<>(); + try { + Class.forName(jdbcDriver); + conn = DriverManager.getConnection(jdbcUrl, user, password); + } catch (SQLException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + try { + PreparedStatement pst = null; + pst = conn.prepareStatement(sql); + ResultSet resultSet = pst.executeQuery(); + ResultSetMetaData rsd = resultSet.getMetaData(); + for(int i = 1; i <= rsd.getColumnCount(); i++) { + String substring=""; + if (rsd.getColumnClassName(i).contains(".")){ + String[] split = rsd.getColumnClassName(i).split("\\."); + substring = split[split.length-1]; + }else{ + substring = rsd.getColumnClassName(i); + } + + map.put(rsd.getColumnName(i).toUpperCase(),substring); + } + pst.close(); + pst=null; + } catch (SQLException e) { + throw new RuntimeException(e); + } + return map; + } + public List getTableAssets(DataSource dataSource, String tableName){ String user = dataSource.getUsername(); String password = dataSource.getPassword(); - String jdbcDriver = "com.mysql.cj.jdbc.Driver"; - String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + String jdbcDriver=""; + String jdbcUrl=""; + String sql=""; + if (dataSource.getType().equals("MySql")){ + jdbcDriver = "com.mysql.cj.jdbc.Driver"; + jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + sql="SELECT COLUMN_NAME,DATA_TYPE,IS_NULLABLE,COLUMN_KEY,COLUMN_DEFAULT,COLUMN_COMMENT,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION,NUMERIC_SCALE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' AND TABLE_NAME = '"+tableName+"'"; + }else{ + jdbcDriver = "org.postgresql.Driver"; + jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + sql="SELECT \n" + + " a.attname AS \"COLUMN_NAME\",\n" + + " coalesce(cc.description, '') AS \"COLUMN_COMMENT\",\n" + + " EXISTS (\n" + + " SELECT 1\n" + + " FROM pg_index i\n" + + " JOIN pg_attribute ia ON ia.attrelid = i.indrelid AND ia.attnum = ANY(i.indkey)\n" + + " WHERE i.indrelid = a.attrelid AND ia.attname = a.attname AND i.indisprimary\n" + + " ) AS \"COLUMN_KEY\",\n" + + " format_type(a.atttypid, a.atttypmod) AS \"DATA_TYPE\",\n" + + " CASE \n" + + " WHEN strpos(format_type(a.atttypid, a.atttypmod), '(') > 0 THEN \n" + + " split_part(substring(format_type(a.atttypid, a.atttypmod) FROM '\\((.*)\\)')::text, ',', 1)::int\n" + + " ELSE \n" + + " NULL\n" + + " END AS \"NUMERIC_PRECISION\",\n" + + " CASE \n" + + " WHEN strpos(format_type(a.atttypid, a.atttypmod), ',') > 0 THEN \n" + + " split_part(substring(format_type(a.atttypid, a.atttypmod) FROM '\\((.*)\\)')::text, ',', 2)::int\n" + + " ELSE \n" + + " NULL\n" + + " END AS \"NUMERIC_SCALE\",\n" + + " COALESCE(pg_get_expr(ad.adbin, ad.adrelid), '') AS \"COLUMN_DEFAULT\",\n" + + " NOT a.attnotnull AS \"IS_NULLABLE\"\n" + + "FROM \n" + + " pg_catalog.pg_attribute a\n" + + "JOIN \n" + + " pg_catalog.pg_class c ON a.attrelid = c.oid\n" + + "JOIN \n" + + " pg_catalog.pg_namespace n ON c.relnamespace = n.oid\n" + + "LEFT JOIN \n" + + " pg_catalog.pg_description cc ON cc.objoid = a.attrelid AND cc.objsubid = a.attnum\n" + + "LEFT JOIN \n" + + " pg_catalog.pg_attrdef ad ON ad.adrelid = a.attrelid AND ad.adnum = a.attnum\n" + + "WHERE \n" + + " n.nspname = '"+dataSource.getModeName()+"' \n" + + " AND c.relname = '"+tableName+"' \n" + + " AND a.attnum > 0 \n" + + " AND NOT a.attisdropped \n" + + "ORDER BY \n" + + " a.attnum "; + } + Connection conn = null; List assetModels = new ArrayList<>(); + Map typeMap = getTypeMap(dataSource, tableName); try { Class.forName(jdbcDriver); conn = DriverManager.getConnection(jdbcUrl, user, password); @@ -253,7 +376,7 @@ public class DataSourceServiceImpl implements IDataSourceService throw new RuntimeException(e); } try { - PreparedStatement pst = conn.prepareStatement("SELECT COLUMN_NAME,DATA_TYPE,IS_NULLABLE,COLUMN_KEY,COLUMN_DEFAULT,COLUMN_COMMENT,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION,NUMERIC_SCALE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' AND TABLE_NAME = '"+tableName+"'"); + PreparedStatement pst = conn.prepareStatement(sql); ResultSet resultSet = pst.executeQuery(); ResultSetMetaData rsd = resultSet.getMetaData(); while (resultSet.next()) { @@ -261,25 +384,59 @@ public class DataSourceServiceImpl implements IDataSourceService assetModel.setComment(resultSet.getString("COLUMN_COMMENT")); assetModel.setName(resultSet.getString("COLUMN_NAME")); - if (resultSet.getString("CHARACTER_MAXIMUM_LENGTH")!=null){ - assetModel.setLength(resultSet.getString("CHARACTER_MAXIMUM_LENGTH")); - }else if (resultSet.getString("NUMERIC_PRECISION")!=null){ - assetModel.setLength(resultSet.getString("NUMERIC_PRECISION")); + assetModel.setType(resultSet.getString("DATA_TYPE")); + if (dataSource.getType().equals("MySql")){ + if (resultSet.getString("CHARACTER_MAXIMUM_LENGTH")!=null){ + assetModel.setLength(resultSet.getString("CHARACTER_MAXIMUM_LENGTH")); + }else if (resultSet.getString("NUMERIC_PRECISION")!=null){ + assetModel.setLength(resultSet.getString("NUMERIC_PRECISION")); + }else{ + assetModel.setLength("-"); + } + //是否主键 + if ("PRI".equals(resultSet.getString("COLUMN_KEY"))){ + assetModel.setIsPrimaryKey("Y"); + }else{ + assetModel.setIsPrimaryKey("N"); + } + //是否不可为空 + if (resultSet.getString("IS_NULLABLE").equals("NO")){ + assetModel.setIsNull("N"); + }else{ + assetModel.setIsNull("Y"); + } }else{ - assetModel.setLength("-"); + if (resultSet.getString("NUMERIC_PRECISION")!=null){ + assetModel.setLength(resultSet.getString("NUMERIC_PRECISION")); + }else{ + if (!assetModel.getType().contains("time")&&!assetModel.getType().contains("date")){ + assetModel.setLength("10"); + }else{ + assetModel.setLength("-"); + } + + } + //是否主键 + if ("t".equals(resultSet.getString("COLUMN_KEY"))){ + assetModel.setIsPrimaryKey("Y"); + }else{ + assetModel.setIsPrimaryKey("N"); + } + //是否不可为空 + if (resultSet.getString("IS_NULLABLE").equals("t")){ + assetModel.setIsNull("N"); + }else{ + assetModel.setIsNull("Y"); + } } + if (resultSet.getString("NUMERIC_SCALE")!=null){ assetModel.setDecimalPlaces(resultSet.getString("NUMERIC_SCALE")); }else{ assetModel.setDecimalPlaces("-"); } - //是否不可为空 - if (resultSet.getString("IS_NULLABLE").equals("NO")){ - assetModel.setIsNull("N"); - }else{ - assetModel.setIsNull("Y"); - } + //是否有默认值 if (resultSet.getString("COLUMN_DEFAULT")!=null){ assetModel.setDefaultValue(resultSet.getString("COLUMN_DEFAULT")); @@ -288,19 +445,12 @@ public class DataSourceServiceImpl implements IDataSourceService } assetModel.setIsDict(""); assetModel.setDictKey(""); - assetModel.setType(resultSet.getString("DATA_TYPE")); - //是否主键 - if ("PRI".equals(resultSet.getString("COLUMN_KEY"))){ - assetModel.setIsPrimaryKey("Y"); - }else{ - assetModel.setIsPrimaryKey("N"); - } + assetModel.setCreateBy(SecurityUtils.getUsername()); assetModel.setCreateTime(new Date()); - assetModel.setMappingType("String"); + assetModel.setMappingType(typeMap.get(assetModel.getName().toUpperCase())); assetModels.add(assetModel); } - pst.close(); } catch(SQLException e) { e.printStackTrace(); @@ -349,6 +499,7 @@ public class DataSourceServiceImpl implements IDataSourceService } @Override + @Transactional public Result structureList(DataSource dataSource) { AssetsModule kvt = getStructure(dataSource); return Result.success(kvt); @@ -358,15 +509,42 @@ public class DataSourceServiceImpl implements IDataSourceService public Result synchronousData(DataSource dataSource) { String user = dataSource.getUsername(); String password = dataSource.getPassword(); - String jdbcDriver = "com.mysql.cj.jdbc.Driver"; - String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + String jdbcDriver =""; + String jdbcUrl = ""; + String sql=""; + if (dataSource.getType().equals("MySql")){ + jdbcDriver = "com.mysql.cj.jdbc.Driver"; + jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + sql="SELECT TABLE_NAME t_name,TABLE_COMMENT table_comment,TABLE_ROWS table_rows,(SELECT count(*) FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' and TABLE_NAME=t_name) fields FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='"+dataSource.getDatabaseName()+"'"; + }else{ + jdbcDriver = "org.postgresql.Driver"; + jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName(); + sql="SELECT \n" + + " c.relname AS t_name,\n" + + " pgd.description AS table_comment,\n" + + " COUNT(col.column_name) AS fields,\n" + + " c.reltuples AS table_rows\n" + + "FROM \n" + + " pg_catalog.pg_class c\n" + + "LEFT JOIN \n" + + " pg_catalog.pg_namespace n ON n.oid = c.relnamespace\n" + + "LEFT JOIN \n" + + " pg_catalog.pg_description pgd ON pgd.objoid = c.oid\n" + + "LEFT JOIN \n" + + " information_schema.columns col ON c.relname = col.table_name AND n.nspname = col.table_schema\n" + + "WHERE \n" + + " n.nspname = '"+dataSource.getModeName()+"' \n" + + "AND \n" + + " c.relkind = 'r' \n" + + "GROUP BY \n" + + " c.relname, pgd.description, c.reltuples"; + } Connection conn = null; HashMap map = new HashMap<>(); ArrayList dataAssets = new ArrayList<>(); try { Class.forName(jdbcDriver); conn = DriverManager.getConnection(jdbcUrl, user, password); - String sql="SELECT TABLE_NAME t_name,TABLE_COMMENT table_comment,TABLE_ROWS table_rows,(SELECT count(*) FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' and TABLE_NAME=t_name) fields FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='"+dataSource.getDatabaseName()+"'"; PreparedStatement ps = conn.prepareStatement(sql); ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ diff --git a/muyu-modules/muyu-etl/src/main/resources/mapper/system/DataSourceMapper.xml b/muyu-modules/muyu-etl/src/main/resources/mapper/system/DataSourceMapper.xml index 2a88ad7..e336703 100644 --- a/muyu-modules/muyu-etl/src/main/resources/mapper/system/DataSourceMapper.xml +++ b/muyu-modules/muyu-etl/src/main/resources/mapper/system/DataSourceMapper.xml @@ -24,10 +24,11 @@ + - select id, data_source_name, link_address, port, database_name, username, password, connection_param, init_num, max_num, max_wait_time, max_wait_size, create_by, create_time, update_by, update_time, remark, type, system_name from data_source + select id, data_source_name, link_address, port, database_name, username, password, connection_param, init_num, max_num, max_wait_time, max_wait_size, create_by, create_time, update_by, update_time, remark, type, system_name,mode_name from data_source @@ -75,6 +77,7 @@ remark, type, system_name, + mode_name, #{dataSourceName}, @@ -95,6 +98,7 @@ #{remark}, #{type}, #{systemName}, + #{modeName}, @@ -119,6 +123,7 @@ remark = #{remark}, type = #{type}, system_name = #{systemName}, + mode_name = #{modeName}, where id = #{id}