feat: 增加了不同数据库的数据接入新增,修改,测试连接,同步数据

将后台代码功能完善了一下,使其能添加MySql和PostGre两种数据库的数据接入,并且能够测试连接和同步数据
master
yaoxin 2024-04-23 20:35:55 +08:00
parent a8bd6bd1bb
commit 0689fdb194
4 changed files with 245 additions and 45 deletions

View File

@ -18,6 +18,11 @@
</properties>
<dependencies>
<!-- PostgreSQL JDBC驱动 -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos -->
<dependency>

View File

@ -94,6 +94,18 @@ public class DataSource extends BaseEntity
@Excel(name = "数据来源名称")
private String systemName;
/** 模式名称 */
@Excel(name = "数据来源名称")
private String modeName;
public String getModeName() {
return modeName;
}
public void setModeName(String modeName) {
this.modeName = modeName;
}
public List<TableDetail> getTableList() {
return tableList;
}

View File

@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.muyu.etl.mapper.DataSourceMapper;
import com.muyu.etl.service.IDataSourceService;
import org.springframework.transaction.annotation.Transactional;
/**
* Service
@ -158,25 +159,48 @@ public class DataSourceServiceImpl implements IDataSourceService
@Override
public Result testConnection(DataSource dataSource) {
String user = dataSource.getUsername();
String password = dataSource.getPassword();
String jdbcDriver = "com.mysql.cj.jdbc.Driver";
String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
if (dataSource.getConnectionParam()!=null && dataSource.getConnectionParam()!=""){
jdbcUrl = jdbcUrl+"?"+dataSource.getConnectionParam();
}
Connection conn = null;
try {
Class.forName(jdbcDriver);
} catch (ClassNotFoundException e) {
return Result.error("连接失败");
}
try {
conn = DriverManager.getConnection(jdbcUrl, user, password);
conn.close();
}catch (Exception e){
return Result.error("连接失败");
if (dataSource.getType().equals("MySql")){
String user = dataSource.getUsername();
String password = dataSource.getPassword();
String jdbcDriver = "com.mysql.cj.jdbc.Driver";
String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
if (dataSource.getConnectionParam()!=null && dataSource.getConnectionParam()!=""){
jdbcUrl = jdbcUrl+"?"+dataSource.getConnectionParam();
}
Connection conn = null;
try {
Class.forName(jdbcDriver);
} catch (ClassNotFoundException e) {
return Result.error("连接失败");
}
try {
conn = DriverManager.getConnection(jdbcUrl, user, password);
conn.close();
}catch (Exception e){
return Result.error("连接失败");
}
}else{
String user = dataSource.getUsername();
String password = dataSource.getPassword();
String jdbcDriver = "org.postgresql.Driver";
String jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
if (dataSource.getConnectionParam()!=null && dataSource.getConnectionParam()!=""){
jdbcUrl = jdbcUrl+"?"+dataSource.getConnectionParam();
}
Connection conn = null;
try {
Class.forName(jdbcDriver);
} catch (ClassNotFoundException e) {
return Result.error("连接失败");
}
try {
conn = DriverManager.getConnection(jdbcUrl, user, password);
conn.close();
}catch (Exception e){
return Result.error("连接失败");
}
}
return Result.success("连接成功");
}
@ -233,13 +257,112 @@ public class DataSourceServiceImpl implements IDataSourceService
return assetsModule;
}
public Map<String,String> getTypeMap(DataSource dataSource, String tableName){
String user = dataSource.getUsername();
String password = dataSource.getPassword();
String jdbcDriver="";
String jdbcUrl="";
String sql="";
if (dataSource.getType().equals("MySql")){
jdbcDriver = "com.mysql.cj.jdbc.Driver";
jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
sql = "select * from "+tableName;
}else{
jdbcDriver = "org.postgresql.Driver";
jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
sql = "select * from "+dataSource.getModeName()+"."+tableName;
}
Connection conn = null;
Map<String, String> map = new HashMap<>();
try {
Class.forName(jdbcDriver);
conn = DriverManager.getConnection(jdbcUrl, user, password);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
try {
PreparedStatement pst = null;
pst = conn.prepareStatement(sql);
ResultSet resultSet = pst.executeQuery();
ResultSetMetaData rsd = resultSet.getMetaData();
for(int i = 1; i <= rsd.getColumnCount(); i++) {
String substring="";
if (rsd.getColumnClassName(i).contains(".")){
String[] split = rsd.getColumnClassName(i).split("\\.");
substring = split[split.length-1];
}else{
substring = rsd.getColumnClassName(i);
}
map.put(rsd.getColumnName(i).toUpperCase(),substring);
}
pst.close();
pst=null;
} catch (SQLException e) {
throw new RuntimeException(e);
}
return map;
}
public List<AssetModel> getTableAssets(DataSource dataSource, String tableName){
String user = dataSource.getUsername();
String password = dataSource.getPassword();
String jdbcDriver = "com.mysql.cj.jdbc.Driver";
String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
String jdbcDriver="";
String jdbcUrl="";
String sql="";
if (dataSource.getType().equals("MySql")){
jdbcDriver = "com.mysql.cj.jdbc.Driver";
jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
sql="SELECT COLUMN_NAME,DATA_TYPE,IS_NULLABLE,COLUMN_KEY,COLUMN_DEFAULT,COLUMN_COMMENT,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION,NUMERIC_SCALE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' AND TABLE_NAME = '"+tableName+"'";
}else{
jdbcDriver = "org.postgresql.Driver";
jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
sql="SELECT \n" +
" a.attname AS \"COLUMN_NAME\",\n" +
" coalesce(cc.description, '') AS \"COLUMN_COMMENT\",\n" +
" EXISTS (\n" +
" SELECT 1\n" +
" FROM pg_index i\n" +
" JOIN pg_attribute ia ON ia.attrelid = i.indrelid AND ia.attnum = ANY(i.indkey)\n" +
" WHERE i.indrelid = a.attrelid AND ia.attname = a.attname AND i.indisprimary\n" +
" ) AS \"COLUMN_KEY\",\n" +
" format_type(a.atttypid, a.atttypmod) AS \"DATA_TYPE\",\n" +
" CASE \n" +
" WHEN strpos(format_type(a.atttypid, a.atttypmod), '(') > 0 THEN \n" +
" split_part(substring(format_type(a.atttypid, a.atttypmod) FROM '\\((.*)\\)')::text, ',', 1)::int\n" +
" ELSE \n" +
" NULL\n" +
" END AS \"NUMERIC_PRECISION\",\n" +
" CASE \n" +
" WHEN strpos(format_type(a.atttypid, a.atttypmod), ',') > 0 THEN \n" +
" split_part(substring(format_type(a.atttypid, a.atttypmod) FROM '\\((.*)\\)')::text, ',', 2)::int\n" +
" ELSE \n" +
" NULL\n" +
" END AS \"NUMERIC_SCALE\",\n" +
" COALESCE(pg_get_expr(ad.adbin, ad.adrelid), '') AS \"COLUMN_DEFAULT\",\n" +
" NOT a.attnotnull AS \"IS_NULLABLE\"\n" +
"FROM \n" +
" pg_catalog.pg_attribute a\n" +
"JOIN \n" +
" pg_catalog.pg_class c ON a.attrelid = c.oid\n" +
"JOIN \n" +
" pg_catalog.pg_namespace n ON c.relnamespace = n.oid\n" +
"LEFT JOIN \n" +
" pg_catalog.pg_description cc ON cc.objoid = a.attrelid AND cc.objsubid = a.attnum\n" +
"LEFT JOIN \n" +
" pg_catalog.pg_attrdef ad ON ad.adrelid = a.attrelid AND ad.adnum = a.attnum\n" +
"WHERE \n" +
" n.nspname = '"+dataSource.getModeName()+"' \n" +
" AND c.relname = '"+tableName+"' \n" +
" AND a.attnum > 0 \n" +
" AND NOT a.attisdropped \n" +
"ORDER BY \n" +
" a.attnum ";
}
Connection conn = null;
List<AssetModel> assetModels = new ArrayList<>();
Map<String, String> typeMap = getTypeMap(dataSource, tableName);
try {
Class.forName(jdbcDriver);
conn = DriverManager.getConnection(jdbcUrl, user, password);
@ -253,7 +376,7 @@ public class DataSourceServiceImpl implements IDataSourceService
throw new RuntimeException(e);
}
try {
PreparedStatement pst = conn.prepareStatement("SELECT COLUMN_NAME,DATA_TYPE,IS_NULLABLE,COLUMN_KEY,COLUMN_DEFAULT,COLUMN_COMMENT,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION,NUMERIC_SCALE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' AND TABLE_NAME = '"+tableName+"'");
PreparedStatement pst = conn.prepareStatement(sql);
ResultSet resultSet = pst.executeQuery();
ResultSetMetaData rsd = resultSet.getMetaData();
while (resultSet.next()) {
@ -261,25 +384,59 @@ public class DataSourceServiceImpl implements IDataSourceService
assetModel.setComment(resultSet.getString("COLUMN_COMMENT"));
assetModel.setName(resultSet.getString("COLUMN_NAME"));
if (resultSet.getString("CHARACTER_MAXIMUM_LENGTH")!=null){
assetModel.setLength(resultSet.getString("CHARACTER_MAXIMUM_LENGTH"));
}else if (resultSet.getString("NUMERIC_PRECISION")!=null){
assetModel.setLength(resultSet.getString("NUMERIC_PRECISION"));
assetModel.setType(resultSet.getString("DATA_TYPE"));
if (dataSource.getType().equals("MySql")){
if (resultSet.getString("CHARACTER_MAXIMUM_LENGTH")!=null){
assetModel.setLength(resultSet.getString("CHARACTER_MAXIMUM_LENGTH"));
}else if (resultSet.getString("NUMERIC_PRECISION")!=null){
assetModel.setLength(resultSet.getString("NUMERIC_PRECISION"));
}else{
assetModel.setLength("-");
}
//是否主键
if ("PRI".equals(resultSet.getString("COLUMN_KEY"))){
assetModel.setIsPrimaryKey("Y");
}else{
assetModel.setIsPrimaryKey("N");
}
//是否不可为空
if (resultSet.getString("IS_NULLABLE").equals("NO")){
assetModel.setIsNull("N");
}else{
assetModel.setIsNull("Y");
}
}else{
assetModel.setLength("-");
if (resultSet.getString("NUMERIC_PRECISION")!=null){
assetModel.setLength(resultSet.getString("NUMERIC_PRECISION"));
}else{
if (!assetModel.getType().contains("time")&&!assetModel.getType().contains("date")){
assetModel.setLength("10");
}else{
assetModel.setLength("-");
}
}
//是否主键
if ("t".equals(resultSet.getString("COLUMN_KEY"))){
assetModel.setIsPrimaryKey("Y");
}else{
assetModel.setIsPrimaryKey("N");
}
//是否不可为空
if (resultSet.getString("IS_NULLABLE").equals("t")){
assetModel.setIsNull("N");
}else{
assetModel.setIsNull("Y");
}
}
if (resultSet.getString("NUMERIC_SCALE")!=null){
assetModel.setDecimalPlaces(resultSet.getString("NUMERIC_SCALE"));
}else{
assetModel.setDecimalPlaces("-");
}
//是否不可为空
if (resultSet.getString("IS_NULLABLE").equals("NO")){
assetModel.setIsNull("N");
}else{
assetModel.setIsNull("Y");
}
//是否有默认值
if (resultSet.getString("COLUMN_DEFAULT")!=null){
assetModel.setDefaultValue(resultSet.getString("COLUMN_DEFAULT"));
@ -288,19 +445,12 @@ public class DataSourceServiceImpl implements IDataSourceService
}
assetModel.setIsDict("");
assetModel.setDictKey("");
assetModel.setType(resultSet.getString("DATA_TYPE"));
//是否主键
if ("PRI".equals(resultSet.getString("COLUMN_KEY"))){
assetModel.setIsPrimaryKey("Y");
}else{
assetModel.setIsPrimaryKey("N");
}
assetModel.setCreateBy(SecurityUtils.getUsername());
assetModel.setCreateTime(new Date());
assetModel.setMappingType("String");
assetModel.setMappingType(typeMap.get(assetModel.getName().toUpperCase()));
assetModels.add(assetModel);
}
pst.close();
} catch(SQLException e) {
e.printStackTrace();
@ -349,6 +499,7 @@ public class DataSourceServiceImpl implements IDataSourceService
}
@Override
@Transactional
public Result structureList(DataSource dataSource) {
AssetsModule kvt = getStructure(dataSource);
return Result.success(kvt);
@ -358,15 +509,42 @@ public class DataSourceServiceImpl implements IDataSourceService
public Result synchronousData(DataSource dataSource) {
String user = dataSource.getUsername();
String password = dataSource.getPassword();
String jdbcDriver = "com.mysql.cj.jdbc.Driver";
String jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
String jdbcDriver ="";
String jdbcUrl = "";
String sql="";
if (dataSource.getType().equals("MySql")){
jdbcDriver = "com.mysql.cj.jdbc.Driver";
jdbcUrl = "jdbc:mysql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
sql="SELECT TABLE_NAME t_name,TABLE_COMMENT table_comment,TABLE_ROWS table_rows,(SELECT count(*) FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' and TABLE_NAME=t_name) fields FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='"+dataSource.getDatabaseName()+"'";
}else{
jdbcDriver = "org.postgresql.Driver";
jdbcUrl = "jdbc:postgresql://"+dataSource.getLinkAddress()+":"+dataSource.getPort()+"/"+dataSource.getDatabaseName();
sql="SELECT \n" +
" c.relname AS t_name,\n" +
" pgd.description AS table_comment,\n" +
" COUNT(col.column_name) AS fields,\n" +
" c.reltuples AS table_rows\n" +
"FROM \n" +
" pg_catalog.pg_class c\n" +
"LEFT JOIN \n" +
" pg_catalog.pg_namespace n ON n.oid = c.relnamespace\n" +
"LEFT JOIN \n" +
" pg_catalog.pg_description pgd ON pgd.objoid = c.oid\n" +
"LEFT JOIN \n" +
" information_schema.columns col ON c.relname = col.table_name AND n.nspname = col.table_schema\n" +
"WHERE \n" +
" n.nspname = '"+dataSource.getModeName()+"' \n" +
"AND \n" +
" c.relkind = 'r' \n" +
"GROUP BY \n" +
" c.relname, pgd.description, c.reltuples";
}
Connection conn = null;
HashMap<String, String> map = new HashMap<>();
ArrayList<DataAsset> dataAssets = new ArrayList<>();
try {
Class.forName(jdbcDriver);
conn = DriverManager.getConnection(jdbcUrl, user, password);
String sql="SELECT TABLE_NAME t_name,TABLE_COMMENT table_comment,TABLE_ROWS table_rows,(SELECT count(*) FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA = '"+dataSource.getDatabaseName()+"' and TABLE_NAME=t_name) fields FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='"+dataSource.getDatabaseName()+"'";
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){

View File

@ -24,10 +24,11 @@
<result property="remark" column="remark" />
<result property="type" column="type" />
<result property="systemName" column="system_name" />
<result property="modeName" column="mode_name" />
</resultMap>
<sql id="selectDataSourceVo">
select id, data_source_name, link_address, port, database_name, username, password, connection_param, init_num, max_num, max_wait_time, max_wait_size, create_by, create_time, update_by, update_time, remark, type, system_name from data_source
select id, data_source_name, link_address, port, database_name, username, password, connection_param, init_num, max_num, max_wait_time, max_wait_size, create_by, create_time, update_by, update_time, remark, type, system_name,mode_name from data_source
</sql>
<select id="selectDataSourceList" parameterType="com.muyu.etl.domain.DataSource" resultMap="DataSourceResult">
@ -46,6 +47,7 @@
<if test="maxWaitSize != null "> and max_wait_size = #{maxWaitSize}</if>
<if test="type != null and type != ''"> and type = #{type}</if>
<if test="systemName != null and systemName != ''"> and system_name like concat('%', #{systemName}, '%')</if>
<if test="modeName != null and modeName != ''"> and mode_name like concat('%', #{modeName}, '%')</if>
</where>
</select>
@ -75,6 +77,7 @@
<if test="remark != null">remark,</if>
<if test="type != null">type,</if>
<if test="systemName != null">system_name,</if>
<if test="modeName != null">mode_name,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="dataSourceName != null">#{dataSourceName},</if>
@ -95,6 +98,7 @@
<if test="remark != null">#{remark},</if>
<if test="type != null">#{type},</if>
<if test="systemName != null">#{systemName},</if>
<if test="modeName != null">#{modeName},</if>
</trim>
</insert>
@ -119,6 +123,7 @@
<if test="remark != null">remark = #{remark},</if>
<if test="type != null">type = #{type},</if>
<if test="systemName != null">system_name = #{systemName},</if>
<if test="modeName != null">mode_name = #{modeName},</if>
</trim>
where id = #{id}
</update>