diff --git a/muyu-source-common/src/main/java/com/muyu/source/domain/DataSource.java b/muyu-source-common/src/main/java/com/muyu/source/domain/DataSource.java index 382c9d9..3fe0eb5 100644 --- a/muyu-source-common/src/main/java/com/muyu/source/domain/DataSource.java +++ b/muyu-source-common/src/main/java/com/muyu/source/domain/DataSource.java @@ -125,7 +125,7 @@ public class DataSource extends BaseEntity { @Excel(name = "初始连接数量") @TableField("init_total") - private Long initTotal; + private Integer initTotal; /** * 最大连接数量 @@ -133,7 +133,7 @@ public class DataSource extends BaseEntity { @Excel(name = "最大连接数量") @TableField("max_num") - private Long maxNum; + private Integer maxNum; /** * 最大等待时间 @@ -150,6 +150,11 @@ public class DataSource extends BaseEntity { @TableField("max_wait_size") private Long maxWaitSize; + /** + * 驱动 com.mysql.cj.jdbc.Driver + * */ + @Excel(name = "驱动",defaultValue = "com.mysql.cj.jdbc.Driver") + private String driverName; /** diff --git a/muyu-source-common/src/main/java/com/muyu/source/pool/BasePool.java b/muyu-source-common/src/main/java/com/muyu/source/pool/BasePool.java new file mode 100644 index 0000000..d79bd97 --- /dev/null +++ b/muyu-source-common/src/main/java/com/muyu/source/pool/BasePool.java @@ -0,0 +1,41 @@ +package com.muyu.source.pool; + +/** + * + * + * 提供了一个连接池的准则 + * @author Lenovo + */ + +public interface BasePool { + + /** + * 初始化 + */ + public void init(); + + /** + * 获取连接 + * @return + */ + public T getConn(); + + /** + * 归还连接 + * @param conn + */ + public void replease(T conn); + + /** + * 创建连接 + * @return + */ + public T createConn(); + + /** + * 关闭连接 + */ + public void closeConn(); + + +} diff --git a/muyu-source-common/src/main/java/com/muyu/source/pool/MysqlPool.java b/muyu-source-common/src/main/java/com/muyu/source/pool/MysqlPool.java new file mode 100644 index 0000000..0e61cda --- /dev/null +++ b/muyu-source-common/src/main/java/com/muyu/source/pool/MysqlPool.java @@ -0,0 +1,218 @@ +package com.muyu.source.pool; + + + + + +import com.muyu.source.domain.DataSource; +import com.muyu.source.pool.config.BaseConfig; +import com.muyu.source.pool.exeption.MysqlConnException; +import lombok.extern.log4j.Log4j2; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author Lenovo + * + * + * Mysql连接池信息 + */ +@Log4j2 +public class MysqlPool implements BasePool{ + + + /** + * 等待队列 + */ + private Queue mysqlConnQueue = null; + + /** + * 活动队列 + */ + private Queue activeMysqlConnQueue =null; + + + /** + * 记录队列连接总数 + */ + private AtomicInteger count = new AtomicInteger(); + + /** + * mysql基础配置 + */ + public DataSource etlDataScore; + + + /** + * 实例化连接池,加载驱动 + * @param etlDataScore + */ + public MysqlPool(DataSource etlDataScore){ + log.info("MySQL连接池实例化完成"); + this.etlDataScore=etlDataScore; + BaseConfig.driver(etlDataScore.getDriverName()); + } + + /** + * 初始化连接池 + */ + @Override + public void init() { + Integer maxCount = this.etlDataScore.getMaxNum(); + Integer initCount = this.etlDataScore.getInitTotal(); + this.mysqlConnQueue = new LinkedBlockingQueue(maxCount); + this.activeMysqlConnQueue=new LinkedBlockingQueue(maxCount); + + + for (Integer i = 0; i < initCount; i++) { + + this.mysqlConnQueue.offer(createConn()); + count.incrementAndGet(); + } + log.info("MySQL连接池初始化完成"); + } + + /** + *1.获取当前的毫秒 用来计算获取连接是否超时 + * 2.判断队列当中是否还有空闲连接 + * 3如何有连接我们获取连接进行返回了 + * 4如何没有 判断是否达到最大的连接数 + * 5如何达到最大连接数量则进行等待(自旋) + * 6如何没有达到最大连接数量额进行创建一个连接 + * 7、放到使用队列当中,然后在进行返回 + * 8如果连接超时的话则进行抛出异常 + * @return + */ + @Override + public Connection getConn() { + long startTime = System.currentTimeMillis(); + //从空闲队列当中取出放入活动队列 + Connection conn = this.mysqlConnQueue.poll(); + if (conn!=null){ + this.activeMysqlConnQueue.offer(conn); + return conn; + } + + //如果当前的连接数量小于最大的连接数量的时候就进行创建新的连接 + if (count.get() < etlDataScore.getMaxNum()){ + Connection mysqlConn = createConn(); + this.activeMysqlConnQueue.offer(mysqlConn); + count.incrementAndGet(); + return mysqlConn; + } + + if ((System.currentTimeMillis() - startTime)>this.etlDataScore.getMaxWaitTime()){ + throw new MysqlConnException("连接超时!"); + } + + return null; + } + + + /** + * 归还连接 + * @param conn + */ + @Override + public void replease(Connection conn) { + //删除活动队列当中的连接 + if (this.activeMysqlConnQueue.remove(conn)){ + //把这个连接放入到空闲队列当中 + this.mysqlConnQueue.offer(conn); + } + } + + + /** + * 获取mysql连接信息 + * @return + */ + @Override + public Connection createConn(){ + String url = this.etlDataScore.getIp(); + String userName = this.etlDataScore.getUserName(); + String userPwd = this.etlDataScore.getPassword(); + Connection mysqlConn=null; + try { + mysqlConn = DriverManager.getConnection(url, userName, userPwd); + } catch (SQLException e) { + throw new RuntimeException(e); + } + log.info("初始化了一个数据库连接:{ip:"+this.etlDataScore.getIp()+" port:"+this.etlDataScore.getPort()+"databaseName"+this.etlDataScore.getDatabaseName()+" }"); + return mysqlConn; + } + + @Override + public void closeConn() { + closeBaseConn(); + closeActiveConn(); + } + + + /** + * 关闭空闲连接 + */ + public void closeBaseConn() { + //从空闲连接当中拿出一个连接 准备进行关闭 + //如何拿出的这个链接为null 表示以列当中没有连接信息 + Connection poll = this.mysqlConnQueue.poll(); + if (poll!=null){ + try { + poll.close(); + } catch (SQLException e) { + try { + //判断这个接是否被关闭了,如果连接被关闭则不需要放入队列当中 + //如何这个链接没有被关闭 则放入队列当中 尝试下次关闭 + if (!poll.isClosed()){ + this.mysqlConnQueue.offer(poll); + } + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + throw new RuntimeException(e); + }finally { + closeBaseConn(); + } + + } + + } + /** + * 关闭活动连接 + */ + public void closeActiveConn() { + //从空闲连接当中拿出一个连接 准备进行关闭 + //如何拿出的这个链接为null 表示以列当中没有连接信息 + Connection poll = this.activeMysqlConnQueue.poll(); + if (poll!=null){ + try { + poll.close(); + } catch (SQLException e) { + try { + //判断这个接是否被关闭了,如果连接被关闭则不需要放入队列当中 + //如何这个链接没有被关闭 则放入队列当中 尝试下次关闭 + if (!poll.isClosed()){ + this.activeMysqlConnQueue.offer(poll); + } + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + throw new RuntimeException(e); + }finally { + closeActiveConn(); + } + } + + } + + + + + +} diff --git a/muyu-source-common/src/main/java/com/muyu/source/pool/RedisPool.java b/muyu-source-common/src/main/java/com/muyu/source/pool/RedisPool.java new file mode 100644 index 0000000..f558d82 --- /dev/null +++ b/muyu-source-common/src/main/java/com/muyu/source/pool/RedisPool.java @@ -0,0 +1,156 @@ +package com.muyu.source.pool;//package com.muyu.etl.property.pool; +// +//import com.muyu.etl.property.domain.EtlDataScore; +//import com.muyu.etl.property.pool.exeption.RedisConnException; +//import lombok.extern.log4j.Log4j2; +// +//import java.util.Queue; +//import java.util.concurrent.LinkedBlockingQueue; +//import java.util.concurrent.atomic.AtomicInteger; +// +///** +// * @Author:作者姓名 +// * @Package:com.muyu.etl.property.pool +// * @Project:cloud-etl-property +// * @name:RedisPool +// * @Date:2024/8/26 9:12 +// */ +//@Log4j2 +//public class RedisPool implements BasePool{ +// +// /** +// * 空闲队列 +// */ +// private Queue jedisBaseConnQueue = null; +// +// /** +// * 活动队列 +// */ +// private Queue jedisActiveConnQueue = null; +// +// /** +// * 总连接数 +// */ +// private AtomicInteger count=null; +// +// /** +// * redisPoolConfig +// */ +// +// private EtlDataScore etlDataScore=null; +// +// /** +// * 实例化 +// * @param etlDataScore +// */ +// public RedisPool(EtlDataScore etlDataScore) { +// log.info("redis连接池实例化完成"); +// this.etlDataScore = etlDataScore; +// } +// +// @Override +// public void init() { +// +// Integer maxCount = this.etlDataScore.getMaxCount(); +// +// this.jedisBaseConnQueue = new LinkedBlockingQueue(maxCount); +// this.jedisActiveConnQueue = new LinkedBlockingQueue(maxCount); +// +// this.count = new AtomicInteger(); +// +// Integer initCount = this.etlDataScore.getInitCount(); +// +// +// for (Integer i = 0; i < initCount; i++) { +// this.jedisBaseConnQueue.offer(createConn()); +// count.incrementAndGet(); +// } +// log.info("redis连接池初始化完成!"); +// } +// +// @Override +// public Jedis getConn() { +// long startTime = System.currentTimeMillis(); +// +// +// while (true){ +// Jedis jedis = this.jedisBaseConnQueue.poll(); +// if (jedis!=null){ +// this.jedisActiveConnQueue.offer(jedis); +// return jedis; +// } +// +// if (count.get() this.dataSources.getMaxTime()){ +// throw new RedisConnException("redis获取连接超时!"); +// } +// +// } +// +// } +// +// @Override +// public void replease(Jedis conn) { +// if (this.jedisActiveConnQueue.remove(conn)){ +// this.jedisBaseConnQueue.offer(conn); +// }else { +// count.decrementAndGet(); +// } +// +// } +// +// @Override +// public Jedis createConn() { +// String ip = this.etlDataScore.getHost(); +// String port = this.etlDataScore.getPort(); +// Jedis jedis = new Jedis(ip, Integer.parseInt(port)); +// log.info("初始化了一个redis的连接,{ip:"+ip+" port:"+port+"}"); +// return jedis; +// } +// +// @Override +// public void closeConn() { +// closeJedisBaseConn(); +// closeJedisActiveConn(); +// } +// +// public void closeJedisBaseConn(){ +// Jedis jedis = this.jedisBaseConnQueue.poll(); +// +// if (jedis!=null){ +// try { +// jedis.close(); +// } catch (Exception e) { +// if (!jedis.isConnected()){ +// this.jedisBaseConnQueue.offer(jedis); +// } +// throw new RuntimeException(e); +// }finally { +// closeJedisBaseConn(); +// } +// } +// } +// public void closeJedisActiveConn(){ +// Jedis jedis = this.jedisActiveConnQueue.poll(); +// +// if (jedis!=null){ +// try { +// jedis.close(); +// } catch (Exception e) { +// if (!jedis.isConnected()){ +// this.jedisActiveConnQueue.offer(jedis); +// } +// throw new RuntimeException(e); +// }finally { +// closeJedisActiveConn(); +// } +// } +// } +// +//} diff --git a/muyu-source-common/src/main/java/com/muyu/source/pool/config/BaseConfig.java b/muyu-source-common/src/main/java/com/muyu/source/pool/config/BaseConfig.java new file mode 100644 index 0000000..e701760 --- /dev/null +++ b/muyu-source-common/src/main/java/com/muyu/source/pool/config/BaseConfig.java @@ -0,0 +1,53 @@ +package com.muyu.source.pool.config; + +/** + * @author Lenovo + * @Package:com.muyu.etl.property.pool.config + * @Project:cloud-etl-property + * @name:BaseConfig + * @Date:2024/8/22 22:21 + */ +public class BaseConfig { + + /** + * mysql连接前缀 + */ + public static final String MYSQLJDBCPRO="jdbc:mysql://"; + public static final String SHOWTABLES="show TABLES"; + public static final String SELECTCOUNT="SELECT count(1) FROM "; + public static final String SHOW_FULL_FIELDS_FROM="SHOW FULL FIELDS FROM "; + public static final String SELECT="select "; + public static final String SELECTALL="select * from "; + public static final String FROM=" from "; + public static final String SELECTFIELD=" SELECT \" +\n" + + " \" COLUMN_NAME , \" +\n" + + " \" COLUMN_COMMENT ,\" +\n" + + " \" CASE WHEN COLUMN_KEY = 'PRI' THEN '是' ELSE '否' END ,\" +\n" + + " \" CASE \\n\" +\n" + + " \" WHEN DATA_TYPE = 'int' THEN 'Integer' \" +\n" + + " \" WHEN DATA_TYPE = 'bigint' THEN 'Long' \" +\n" + + " \" WHEN DATA_TYPE = 'varchar' THEN 'String' \" +\n" + + " \" WHEN DATA_TYPE = 'decimal' THEN 'BigDecimal' \" +\n" + + " \" WHEN DATA_TYPE = 'tinyint' AND COLUMN_TYPE = 'tinyint(1)' THEN 'Boolean'\" +\n" + + " \" ELSE DATA_TYPE \\n\" +\n" + + " \" END , \" +\n" + + " \" DATA_TYPE , \\n\" +\n" + + " \" COLUMN_TYPE , \\n\" +\n" + + " \" CHARACTER_MAXIMUM_LENGTH , \\n\" +\n" + + " \" NUMERIC_SCALE , \\n\" +\n" + + " \" IS_NULLABLE , \\n\" +\n" + + " \" COLUMN_DEFAULT \\n\" +\n" + + " \"FROM INFORMATION_SCHEMA.COLUMNS "; + + + + + public static void driver(String driverName){ + try { + Class.forName(driverName); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/muyu-source-common/src/main/java/com/muyu/source/pool/exeption/MysqlConnException.java b/muyu-source-common/src/main/java/com/muyu/source/pool/exeption/MysqlConnException.java new file mode 100644 index 0000000..f980464 --- /dev/null +++ b/muyu-source-common/src/main/java/com/muyu/source/pool/exeption/MysqlConnException.java @@ -0,0 +1,10 @@ +package com.muyu.source.pool.exeption; + +/** + * @author Lenovo + */ +public class MysqlConnException extends RuntimeException{ + public MysqlConnException(String message) { + super(message); + } +} diff --git a/muyu-source-common/src/main/java/com/muyu/source/pool/exeption/RedisConnException.java b/muyu-source-common/src/main/java/com/muyu/source/pool/exeption/RedisConnException.java new file mode 100644 index 0000000..618c4dc --- /dev/null +++ b/muyu-source-common/src/main/java/com/muyu/source/pool/exeption/RedisConnException.java @@ -0,0 +1,13 @@ +package com.muyu.source.pool.exeption; + +/** + * + * + * @author Lenovo + */ +public class RedisConnException extends RuntimeException +{ + public RedisConnException(String message) { + super(message); + } +} diff --git a/muyu-source-server/src/main/java/com/muyu/source/controller/DataSourceController.java b/muyu-source-server/src/main/java/com/muyu/source/controller/DataSourceController.java index 028cdf0..d1c9d16 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/controller/DataSourceController.java +++ b/muyu-source-server/src/main/java/com/muyu/source/controller/DataSourceController.java @@ -13,6 +13,7 @@ import com.muyu.source.service.DataSourceService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletResponse; +import lombok.Data; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; @@ -179,7 +180,11 @@ public class DataSourceController extends BaseController { // dataSourceService.synchronous(dataSource); // return Result.success(); // } - + @PostMapping("/syncAssetStructure") + public Result syncAssetStructure(@RequestBody DataSource dataSource){ + Integer i = dataSourceService.syncAssetStructure(dataSource); + return Result.success(i); + } diff --git a/muyu-source-server/src/main/java/com/muyu/source/controller/TableInfoController.java b/muyu-source-server/src/main/java/com/muyu/source/controller/TableInfoController.java index 8a0bbdb..798e79d 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/controller/TableInfoController.java +++ b/muyu-source-server/src/main/java/com/muyu/source/controller/TableInfoController.java @@ -71,7 +71,7 @@ public class TableInfoController { @GetMapping("/findStruceure/{id}") public Result> findStruceure(@PathVariable("id") Integer id) { - List structureList= structureService.findStructurelistS(id); + List structureList = structureService.findStructurelistS(id); return Result.success(structureList); } diff --git a/muyu-source-server/src/main/java/com/muyu/source/service/DataSourceService.java b/muyu-source-server/src/main/java/com/muyu/source/service/DataSourceService.java index e2fd0c2..dffc2ce 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/service/DataSourceService.java +++ b/muyu-source-server/src/main/java/com/muyu/source/service/DataSourceService.java @@ -41,5 +41,7 @@ public interface DataSourceService extends IService { Boolean testConnection(DataSource dataSource); + Integer syncAssetStructure(DataSource dataSource); + // void synchronous(DataSource dataSource); } diff --git a/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataSourceServiceImpl.java b/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataSourceServiceImpl.java index b60aa04..c50d9d6 100644 --- a/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataSourceServiceImpl.java +++ b/muyu-source-server/src/main/java/com/muyu/source/service/Impl/DataSourceServiceImpl.java @@ -1,19 +1,25 @@ package com.muyu.source.service.Impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Assert; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.muyu.common.core.utils.StringUtils; +import com.muyu.common.security.utils.SecurityUtils; import com.muyu.source.domain.*; import com.muyu.source.mapper.DataSourceMapper; +import com.muyu.source.pool.MysqlPool; import com.muyu.source.service.*; import jakarta.annotation.Resource; import org.springframework.stereotype.Service; import java.sql.*; +import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static com.muyu.source.pool.config.BaseConfig.SELECTALL; import static java.util.concurrent.Executors.newFixedThreadPool; /** @@ -35,8 +41,15 @@ public class DataSourceServiceImpl extends ServiceImpl() {{ -// eq(AssetDataSource::getId, dataSource.getId()); -// }}); -// //如果存在则进行删除 -// if (StringUtils.isNotNull(dataSourceServiceOne)) { -// // 根据资产数据源ID查询所有的表 -// List childrenList = childrenService.list(new LambdaQueryWrapper<>() {{ -// eq(Children::getAssetId, dataSourceServiceOne.getId()); -// }}); -// -// childrenList.forEach(children -> { -// //删除表结构中表的所有数据 -// tableDataService.remove(new LambdaQueryWrapper<>() {{ -// eq(TableData::getChildrenId, children.getId()); -// }}); -// }); -// // 删除数据库结构中所有的数据 -// childrenService.remove(new LambdaQueryWrapper<>() {{ -// eq(Children::getAssetId, dataSourceServiceOne.getId()); -// }}); -// assetDataSourceService.remove(new LambdaQueryWrapper<>() {{ -// eq(AssetDataSource::getId, dataSourceServiceOne.getId()); -// }}); -// } -// AssetDataSource build = AssetDataSource.builder() -// .id(dataSource.getId()) -// .name(dataSource.getName()) -// .systemName(dataSource.getSystemName()) -// .databaseName(dataSource.getDatabaseName()) -// .type("dataSource") -// .build(); -// //添加资产数据源结构 库 -// assetDataSourceService.save(build); -// //添加资产数据源下的所有表结构 表 -// List childrenList = addChildren(build); -// //循环遍历添加表结构的数据 字段 -// childrenList.forEach(children -> { -// addTable(build, children.getName()); -// }); -// } -// -// //同步数据库结构 -// private List addChildren(AssetDataSource build) { -// //获取数据源 -// DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{ -// eq(DataSource::getName, build.getName()); -// eq(DataSource::getDatabaseName, build.getDatabaseName()); -// }}); -// //获取数据源类型 -// DataType dataType = dataTypeService.getOne(new LambdaQueryWrapper<>() {{ -// eq(DataType::getType, dataSource.getDataType()); -// }}); -// //用于拼接jdbc连接数据库的路径 -// String jdbcUrl = ""; -// //用于拼接sql语句 -// String sql = ""; -// //判断数据库类型 -// if ("MySql".equals(dataType.getType())) { -// //获取表名称以及表注释 -// sql = "select TABLE_NAME,TABLE_COMMENT from INFORMATION_SCHEMA.Tables where table_schema =" + "'" + dataSource.getDatabaseName() + "'"; -// } -// //拼接jdbc连接数据库的路径 -// jdbcUrl = dataType.getPrefix() + dataSource.getIp() + ":" + dataSource.getPort() + "/" + -// dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam(); -// try { -// //加载驱动 -// Class.forName(dataType.getDriverManager()); -// //获取连接 -// Connection connection = DriverManager.getConnection(jdbcUrl, dataSource.getUserName(), dataSource.getPassword()); -// //执行sql语句 -// PreparedStatement preparedStatement = connection.prepareStatement(sql); -// //返回一个结果集 包含查询生成的数据 -// ResultSet resultSet = preparedStatement.executeQuery(); -// //遍历结果集 -// while (resultSet.next()) { -// Children children = Children.builder() -// .name(resultSet.getString("TABLE_NAME")) -// .remark(resultSet.getString("TABLE_COMMENT")) -// .type("dataType") -// .isCenter("Y") -// .assetId(build.getId()) -// .build(); -// //添加到数据库中 -// childrenService.save(children); -// } -// //关闭连接 -// connection.close(); -// resultSet.close(); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// //获取数据库结构 -// List childrenList = childrenService.list(new LambdaQueryWrapper<>() {{ -// eq(Children::getAssetId, build.getId()); -// }}); -// try { -// //加载驱动 -// Class.forName(dataType.getDriverManager()); -// //获取连接 -// Connection connection = DriverManager.getConnection(jdbcUrl, dataSource.getUserName(), dataSource.getPassword()); -// //创建一个Statement对象 -// Statement statement = connection.createStatement(); -// //遍历获取到的表结构集合 -// childrenList.forEach(children -> { -// //查询指定数据库中指定表的记录数 -// String sql1 = "SELECT COUNT(*) AS tableNum FROM " + "`" + dataSource.getDatabaseName() + "`" + "." + "`" + children.getName() + "`"; -// try { -// //执行给定的sql语句 -// ResultSet resultSet = statement.executeQuery(sql1); -// while (resultSet.next()) { -// //检索resultSet对象当前行中的指定列的值 -// int anInt = resultSet.getInt("tableNum"); -// children.setDataTotal(anInt); -// childrenService.updateById(children); -// } -// resultSet.close(); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// }); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// return childrenList; -// } -// -// //同步表结构 -// private void addTable(AssetDataSource build, String name) { -// // 查询数据源对象 -// DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{ -// eq(DataSource::getName, build.getName()); -// eq(DataSource::getDatabaseName, build.getDatabaseName()); -// }}); -// //获取表结构对象 -// Children serviceOne = childrenService.getOne(new LambdaQueryWrapper<>() {{ -// eq(Children::getName, name); -// }}); -// // 获取数据类型对象 -// DataType dataType = dataTypeService.getOne(new LambdaQueryWrapper<>() {{ -// eq(DataType::getType, dataSource.getDataType()); -// }}); -// // 用于拼接jdbc连接数据库的路径 -// String jdbcUrl = ""; -// jdbcUrl = dataType.getPrefix() + dataSource.getIp() + ":" + dataSource.getPort() + "/" + -// dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam(); -// try { -// //加载驱动 -// Class.forName(dataType.getDriverManager()); -// //获取连接 -// Connection connection = DriverManager.getConnection(jdbcUrl, dataSource.getUserName(), dataSource.getPassword()); -// //获取数据库元数据 -// DatabaseMetaData metaData = connection.getMetaData(); -// //获取指定数据库中指定表的字段信息 -// ResultSet columns = metaData.getColumns(dataSource.getDatabaseName(), dataSource.getDatabaseName(), name, "%"); -// //获取指定数据库中指定表的主键信息 -// ResultSet primaryKeys = metaData.getPrimaryKeys(dataSource.getDatabaseName(), dataSource.getDatabaseName(), name); -// //获取指定数据库中指定表的主键名称 -// String primaryKey = ""; -// //遍历主键信息 -// while (primaryKeys.next()) { -// primaryKey = primaryKeys.getString("COLUMN_NAME"); -// } -// //遍历字段信息 -// while (columns.next()) { -// //获取字段名称 -// String columnName = columns.getString("COLUMN_NAME"); -// //获取字段备注 -// String columnComment = columns.getString("REMARKS"); -// //获取字段类型 -// String columnType = columns.getString("TYPE_NAME"); -// //获取字段长度 -// int columnSize = columns.getInt("COLUMN_SIZE"); -// //获取字段小数位数 -// int decimalDigits = columns.getInt("DECIMAL_DIGITS"); -// //判断字段是否为空 -// String isNullable = columns.getString("IS_NULLABLE"); -// //判断字段是否为默认值 -// String columnDefault = columns.getString("COLUMN_DEF"); -// // 添加字段 -// TableData tableData = TableData.builder() -// .name(columnName) -// .comment(columnComment) -// .isPrimaryKey(columnName.equals(primaryKey) ? "Y" : "N") -// .type(columnType) -// .length(columnSize) -// .decimalPlaces(decimalDigits) -// .isNull(isNullable.equals("YES") ? "Y" : "N") -// .defaultValue(columnDefault) -// .isDict("N") -// .dictKey("") -// .childrenId(serviceOne.getId()) -// .build(); -// //添加字段信息到数据库中 -// tableDataService.save(tableData); -// } -// //关闭资源 -// columns.close(); -// primaryKeys.close(); -// connection.close(); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// -// -// } + @Override + public Integer syncAssetStructure(DataSource etlDataScore) { + try { + if (etlDataScore.getDataType().equals(MYSQL)) { + MysqlPool mysqlPool = new MysqlPool(etlDataScore); + mysqlPool.init(); + Connection conn = mysqlPool.getConn(); + TableInfo tableInfoInsert = TableInfo.builder() + .basicId(etlDataScore.getId()) + .parentId(PARENTID) + .tableRemark("") + .center("Y") + .type("dataSource") + .tableName(etlDataScore.getDatabaseName()) + .createBy(SecurityUtils.getUsername()) + .createTime(new java.util.Date()) + .build(); + + //添加数据库table_info表 + tableInfoService.saveOrUpdate(tableInfoInsert, new LambdaUpdateWrapper(TableInfo.class) {{ + eq(TableInfo::getTableName, tableInfoInsert.getTableName()); + eq(TableInfo::getBasicId, etlDataScore.getId()); + }}); + + //根据数据库id和数据库名称或表名称查询 + TableInfo tableInfo = tableInfoService.selectTableInfoByName(tableInfoInsert); + + DatabaseMetaData metaData = conn.getMetaData(); + + ResultSet rs = metaData.getTables(etlDataScore.getDatabaseName(), null, "%", new String[]{"TABLE"}); + + while (rs.next()) { + String tableName = rs.getString("TABLE_NAME"); + String tableRemark = rs.getString("REMARKS"); + + PreparedStatement preparedStatement = conn.prepareStatement(SELECTALL + tableName); + + ResultSet resultSet = preparedStatement.executeQuery(); + + Long rowCount = 0L; + while (resultSet.next()) { + rowCount++; + } + TableInfo build = TableInfo.builder() + .basicId(etlDataScore.getId()) + .tableName(tableName) + //bug点,tableRemark为空,造成空指针异常 + .tableRemark(tableRemark == null ? "" : tableRemark) + .parentId(tableInfo.getId()) + .type("dataTable") + .center("Y") + .updateBy(SecurityUtils.getUsername()) + .dataNum(rowCount) + .updateTime(new Date()) + .build(); + tableInfoService.saveOrUpdate(build, new LambdaUpdateWrapper<>(TableInfo.class) {{ + eq(TableInfo::getTableName, build.getTableName()); + eq(TableInfo::getBasicId, etlDataScore.getId()); + }}); + //根据数据库id和表名查询数据 + TableInfo table = tableInfoService.selectTableInfoByName(build); + + + ExecutorService threadPool = Executors.newCachedThreadPool(); + + threadPool.submit(() -> { + syncData(conn, etlDataScore.getDatabaseName(), table); + }); + preparedStatement.close(); + } + conn.close(); + mysqlPool.closeConn(); + } +// else if (etlDataScore.getType().equals(REDIS)) { +// RedisPool redisPool = new RedisPool(etlDataScore); +// redisPool.init(); +// Jedis jedis = redisPool.getConn(); +// +// String cursor = ScanParams.SCAN_POINTER_START; +// ScanParams scanParams = new ScanParams().count(100); +// +// HashMap map = new HashMap<>(); +// +// +// while (true) { +// ScanResult scanResult = jedis.scan(cursor, scanParams); +// List keys = scanResult.getResult(); +// for (String key : keys) { +// String value = jedis.get(key); +// if (value != null) { +// map.put(key, value); +// } +// } +// cursor = scanResult.getCursor(); +// if (cursor.equals(ScanParams.SCAN_POINTER_START)) { +// break; +// } +// } +// System.out.println(map); +// +// redisPool.replease(jedis); +// redisPool.closeConn(); +// +// +// } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return 1; + } + + private void syncData(Connection conn, String databaseName, TableInfo table) { + ExecutorService threadPool = Executors.newCachedThreadPool(); + PreparedStatement ps = null; + try { + ps = conn.prepareStatement(" SELECT " + + " COLUMN_NAME , " + + " COLUMN_COMMENT ," + + " CASE WHEN COLUMN_KEY = 'PRI' THEN '是' ELSE '否' END ," + + " CASE \n" + + " WHEN DATA_TYPE = 'int' THEN 'Integer' " + + " WHEN DATA_TYPE = 'bigint' THEN 'Long' " + + " WHEN DATA_TYPE = 'varchar' THEN 'String' " + + " WHEN DATA_TYPE = 'decimal' THEN 'BigDecimal' " + + " WHEN DATA_TYPE = 'tinyint' AND COLUMN_TYPE = 'tinyint(1)' THEN 'Boolean'" + + " ELSE DATA_TYPE \n" + + " END , " + + " DATA_TYPE , \n" + + " COLUMN_TYPE , \n" + + " CHARACTER_MAXIMUM_LENGTH , \n" + + " NUMERIC_SCALE , \n" + + " IS_NULLABLE , \n" + + " COLUMN_DEFAULT \n" + + "FROM INFORMATION_SCHEMA.COLUMNS WHERE \n" + + "TABLE_SCHEMA = '" + databaseName + "' \n" + + "AND TABLE_NAME = '" + table.getTableName() + "'"); + + + ResultSet resultSet = ps.executeQuery(); + while (resultSet.next()) { + String columnName = String.valueOf(resultSet.getString(1)); + String columnComment = String.valueOf(resultSet.getObject(2)); + String columnKey = String.valueOf(resultSet.getObject(3)); + String end = String.valueOf(resultSet.getObject(4)); + String dataType = String.valueOf(resultSet.getObject(5)); + String columnType = String.valueOf(resultSet.getObject(6)); + String characterMaximumLength = String.valueOf(resultSet.getInt(7)); + String NumericScale = String.valueOf(resultSet.getInt(8)); + String isNullable = String.valueOf(resultSet.getObject(9)); + String columnDefault = String.valueOf(resultSet.getObject(10)); + + Structure build = Structure.builder() + .tableId(table.getId()) + .columnName(String.valueOf(columnName)) + .columnRemark(columnComment) + .isPrimary("是".equals(columnKey) ? "Y" : "N") + .javaType(end) + .columnType(columnType) + .columnLength(characterMaximumLength) + .columnDecimals(NumericScale) + .isNull("YES".equals(isNullable) ? "Y" : "N") + .defaultValue(columnDefault) + .build(); + + threadPool.submit(() -> { + structureService.saveOrUpdate(build, new LambdaUpdateWrapper() {{ + eq(Structure::getTableId, build.getTableId()); + eq(Structure::getColumnName, build.getColumnName()); + eq(Structure::getColumnRemark, build.getColumnRemark()); + }}); + }); + + } + threadPool.shutdown(); + ps.close(); + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } /** * 建立连接