后台资产展示代码

master
冷调 2024-08-29 14:25:21 +08:00
parent 9e3a867117
commit 05f95775d6
11 changed files with 704 additions and 215 deletions

View File

@ -125,7 +125,7 @@ public class DataSource extends BaseEntity {
@Excel(name = "初始连接数量") @Excel(name = "初始连接数量")
@TableField("init_total") @TableField("init_total")
private Long initTotal; private Integer initTotal;
/** /**
* *
@ -133,7 +133,7 @@ public class DataSource extends BaseEntity {
@Excel(name = "最大连接数量") @Excel(name = "最大连接数量")
@TableField("max_num") @TableField("max_num")
private Long maxNum; private Integer maxNum;
/** /**
* *
@ -150,6 +150,11 @@ public class DataSource extends BaseEntity {
@TableField("max_wait_size") @TableField("max_wait_size")
private Long maxWaitSize; private Long maxWaitSize;
/**
* com.mysql.cj.jdbc.Driver
* */
@Excel(name = "驱动",defaultValue = "com.mysql.cj.jdbc.Driver")
private String driverName;
/** /**

View File

@ -0,0 +1,41 @@
package com.muyu.source.pool;
/**
*
*
*
* @author Lenovo
*/
public interface BasePool<T> {
/**
*
*/
public void init();
/**
*
* @return
*/
public T getConn();
/**
*
* @param conn
*/
public void replease(T conn);
/**
*
* @return
*/
public T createConn();
/**
*
*/
public void closeConn();
}

View File

@ -0,0 +1,218 @@
package com.muyu.source.pool;
import com.muyu.source.domain.DataSource;
import com.muyu.source.pool.config.BaseConfig;
import com.muyu.source.pool.exeption.MysqlConnException;
import lombok.extern.log4j.Log4j2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Lenovo
*
*
* Mysql
*/
@Log4j2
public class MysqlPool implements BasePool<Connection>{
/**
*
*/
private Queue<Connection> mysqlConnQueue = null;
/**
*
*/
private Queue<Connection> activeMysqlConnQueue =null;
/**
*
*/
private AtomicInteger count = new AtomicInteger();
/**
* mysql
*/
public DataSource etlDataScore;
/**
* ,
* @param etlDataScore
*/
public MysqlPool(DataSource etlDataScore){
log.info("MySQL连接池实例化完成");
this.etlDataScore=etlDataScore;
BaseConfig.driver(etlDataScore.getDriverName());
}
/**
*
*/
@Override
public void init() {
Integer maxCount = this.etlDataScore.getMaxNum();
Integer initCount = this.etlDataScore.getInitTotal();
this.mysqlConnQueue = new LinkedBlockingQueue<Connection>(maxCount);
this.activeMysqlConnQueue=new LinkedBlockingQueue<Connection>(maxCount);
for (Integer i = 0; i < initCount; i++) {
this.mysqlConnQueue.offer(createConn());
count.incrementAndGet();
}
log.info("MySQL连接池初始化完成");
}
/**
*1.
* 2.
* 3
* 4
* 5()
* 6
* 7使
* 8
* @return
*/
@Override
public Connection getConn() {
long startTime = System.currentTimeMillis();
//从空闲队列当中取出放入活动队列
Connection conn = this.mysqlConnQueue.poll();
if (conn!=null){
this.activeMysqlConnQueue.offer(conn);
return conn;
}
//如果当前的连接数量小于最大的连接数量的时候就进行创建新的连接
if (count.get() < etlDataScore.getMaxNum()){
Connection mysqlConn = createConn();
this.activeMysqlConnQueue.offer(mysqlConn);
count.incrementAndGet();
return mysqlConn;
}
if ((System.currentTimeMillis() - startTime)>this.etlDataScore.getMaxWaitTime()){
throw new MysqlConnException("连接超时!");
}
return null;
}
/**
*
* @param conn
*/
@Override
public void replease(Connection conn) {
//删除活动队列当中的连接
if (this.activeMysqlConnQueue.remove(conn)){
//把这个连接放入到空闲队列当中
this.mysqlConnQueue.offer(conn);
}
}
/**
* mysql
* @return
*/
@Override
public Connection createConn(){
String url = this.etlDataScore.getIp();
String userName = this.etlDataScore.getUserName();
String userPwd = this.etlDataScore.getPassword();
Connection mysqlConn=null;
try {
mysqlConn = DriverManager.getConnection(url, userName, userPwd);
} catch (SQLException e) {
throw new RuntimeException(e);
}
log.info("初始化了一个数据库连接:{ip:"+this.etlDataScore.getIp()+" port:"+this.etlDataScore.getPort()+"databaseName"+this.etlDataScore.getDatabaseName()+" }");
return mysqlConn;
}
@Override
public void closeConn() {
closeBaseConn();
closeActiveConn();
}
/**
*
*/
public void closeBaseConn() {
//从空闲连接当中拿出一个连接 准备进行关闭
//如何拿出的这个链接为null 表示以列当中没有连接信息
Connection poll = this.mysqlConnQueue.poll();
if (poll!=null){
try {
poll.close();
} catch (SQLException e) {
try {
//判断这个接是否被关闭了,如果连接被关闭则不需要放入队列当中
//如何这个链接没有被关闭 则放入队列当中 尝试下次关闭
if (!poll.isClosed()){
this.mysqlConnQueue.offer(poll);
}
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}finally {
closeBaseConn();
}
}
}
/**
*
*/
public void closeActiveConn() {
//从空闲连接当中拿出一个连接 准备进行关闭
//如何拿出的这个链接为null 表示以列当中没有连接信息
Connection poll = this.activeMysqlConnQueue.poll();
if (poll!=null){
try {
poll.close();
} catch (SQLException e) {
try {
//判断这个接是否被关闭了,如果连接被关闭则不需要放入队列当中
//如何这个链接没有被关闭 则放入队列当中 尝试下次关闭
if (!poll.isClosed()){
this.activeMysqlConnQueue.offer(poll);
}
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}finally {
closeActiveConn();
}
}
}
}

View File

@ -0,0 +1,156 @@
package com.muyu.source.pool;//package com.muyu.etl.property.pool;
//
//import com.muyu.etl.property.domain.EtlDataScore;
//import com.muyu.etl.property.pool.exeption.RedisConnException;
//import lombok.extern.log4j.Log4j2;
//
//import java.util.Queue;
//import java.util.concurrent.LinkedBlockingQueue;
//import java.util.concurrent.atomic.AtomicInteger;
//
///**
// * @Author作者姓名
// * @Packagecom.muyu.etl.property.pool
// * @Projectcloud-etl-property
// * @nameRedisPool
// * @Date2024/8/26 9:12
// */
//@Log4j2
//public class RedisPool implements BasePool<Jedis>{
//
// /**
// * 空闲队列
// */
// private Queue<Jedis> jedisBaseConnQueue = null;
//
// /**
// * 活动队列
// */
// private Queue<Jedis> jedisActiveConnQueue = null;
//
// /**
// * 总连接数
// */
// private AtomicInteger count=null;
//
// /**
// * redisPoolConfig
// */
//
// private EtlDataScore etlDataScore=null;
//
// /**
// * 实例化
// * @param etlDataScore
// */
// public RedisPool(EtlDataScore etlDataScore) {
// log.info("redis连接池实例化完成");
// this.etlDataScore = etlDataScore;
// }
//
// @Override
// public void init() {
//
// Integer maxCount = this.etlDataScore.getMaxCount();
//
// this.jedisBaseConnQueue = new LinkedBlockingQueue<Jedis>(maxCount);
// this.jedisActiveConnQueue = new LinkedBlockingQueue<Jedis>(maxCount);
//
// this.count = new AtomicInteger();
//
// Integer initCount = this.etlDataScore.getInitCount();
//
//
// for (Integer i = 0; i < initCount; i++) {
// this.jedisBaseConnQueue.offer(createConn());
// count.incrementAndGet();
// }
// log.info("redis连接池初始化完成!");
// }
//
// @Override
// public Jedis getConn() {
// long startTime = System.currentTimeMillis();
//
//
// while (true){
// Jedis jedis = this.jedisBaseConnQueue.poll();
// if (jedis!=null){
// this.jedisActiveConnQueue.offer(jedis);
// return jedis;
// }
//
// if (count.get()<this.etlDataScore.getMaxCount()){
// jedis = createConn();
// this.jedisActiveConnQueue.offer(jedis);
// count.incrementAndGet();
// return jedis;
// }
//
// if (System.currentTimeMillis() -startTime > this.dataSources.getMaxTime()){
// throw new RedisConnException("redis获取连接超时!");
// }
//
// }
//
// }
//
// @Override
// public void replease(Jedis conn) {
// if (this.jedisActiveConnQueue.remove(conn)){
// this.jedisBaseConnQueue.offer(conn);
// }else {
// count.decrementAndGet();
// }
//
// }
//
// @Override
// public Jedis createConn() {
// String ip = this.etlDataScore.getHost();
// String port = this.etlDataScore.getPort();
// Jedis jedis = new Jedis(ip, Integer.parseInt(port));
// log.info("初始化了一个redis的连接,{ip:"+ip+" port:"+port+"}");
// return jedis;
// }
//
// @Override
// public void closeConn() {
// closeJedisBaseConn();
// closeJedisActiveConn();
// }
//
// public void closeJedisBaseConn(){
// Jedis jedis = this.jedisBaseConnQueue.poll();
//
// if (jedis!=null){
// try {
// jedis.close();
// } catch (Exception e) {
// if (!jedis.isConnected()){
// this.jedisBaseConnQueue.offer(jedis);
// }
// throw new RuntimeException(e);
// }finally {
// closeJedisBaseConn();
// }
// }
// }
// public void closeJedisActiveConn(){
// Jedis jedis = this.jedisActiveConnQueue.poll();
//
// if (jedis!=null){
// try {
// jedis.close();
// } catch (Exception e) {
// if (!jedis.isConnected()){
// this.jedisActiveConnQueue.offer(jedis);
// }
// throw new RuntimeException(e);
// }finally {
// closeJedisActiveConn();
// }
// }
// }
//
//}

View File

@ -0,0 +1,53 @@
package com.muyu.source.pool.config;
/**
* @author Lenovo
* @Packagecom.muyu.etl.property.pool.config
* @Projectcloud-etl-property
* @nameBaseConfig
* @Date2024/8/22 22:21
*/
public class BaseConfig {
/**
* mysql
*/
public static final String MYSQLJDBCPRO="jdbc:mysql://";
public static final String SHOWTABLES="show TABLES";
public static final String SELECTCOUNT="SELECT count(1) FROM ";
public static final String SHOW_FULL_FIELDS_FROM="SHOW FULL FIELDS FROM ";
public static final String SELECT="select ";
public static final String SELECTALL="select * from ";
public static final String FROM=" from ";
public static final String SELECTFIELD=" SELECT \" +\n" +
" \" COLUMN_NAME , \" +\n" +
" \" COLUMN_COMMENT ,\" +\n" +
" \" CASE WHEN COLUMN_KEY = 'PRI' THEN '是' ELSE '否' END ,\" +\n" +
" \" CASE \\n\" +\n" +
" \" WHEN DATA_TYPE = 'int' THEN 'Integer' \" +\n" +
" \" WHEN DATA_TYPE = 'bigint' THEN 'Long' \" +\n" +
" \" WHEN DATA_TYPE = 'varchar' THEN 'String' \" +\n" +
" \" WHEN DATA_TYPE = 'decimal' THEN 'BigDecimal' \" +\n" +
" \" WHEN DATA_TYPE = 'tinyint' AND COLUMN_TYPE = 'tinyint(1)' THEN 'Boolean'\" +\n" +
" \" ELSE DATA_TYPE \\n\" +\n" +
" \" END , \" +\n" +
" \" DATA_TYPE , \\n\" +\n" +
" \" COLUMN_TYPE , \\n\" +\n" +
" \" CHARACTER_MAXIMUM_LENGTH , \\n\" +\n" +
" \" NUMERIC_SCALE , \\n\" +\n" +
" \" IS_NULLABLE , \\n\" +\n" +
" \" COLUMN_DEFAULT \\n\" +\n" +
" \"FROM INFORMATION_SCHEMA.COLUMNS ";
public static void driver(String driverName){
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,10 @@
package com.muyu.source.pool.exeption;
/**
* @author Lenovo
*/
public class MysqlConnException extends RuntimeException{
public MysqlConnException(String message) {
super(message);
}
}

View File

@ -0,0 +1,13 @@
package com.muyu.source.pool.exeption;
/**
*
*
* @author Lenovo
*/
public class RedisConnException extends RuntimeException
{
public RedisConnException(String message) {
super(message);
}
}

View File

@ -13,6 +13,7 @@ import com.muyu.source.service.DataSourceService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import lombok.Data;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
@ -179,7 +180,11 @@ public class DataSourceController extends BaseController {
// dataSourceService.synchronous(dataSource); // dataSourceService.synchronous(dataSource);
// return Result.success(); // return Result.success();
// } // }
@PostMapping("/syncAssetStructure")
public Result syncAssetStructure(@RequestBody DataSource dataSource){
Integer i = dataSourceService.syncAssetStructure(dataSource);
return Result.success(i);
}

View File

@ -71,7 +71,7 @@ public class TableInfoController {
@GetMapping("/findStruceure/{id}") @GetMapping("/findStruceure/{id}")
public Result<List<Structure>> findStruceure(@PathVariable("id") Integer id) { public Result<List<Structure>> findStruceure(@PathVariable("id") Integer id) {
List<Structure> structureList= structureService.findStructurelistS(id); List<Structure> structureList = structureService.findStructurelistS(id);
return Result.success(structureList); return Result.success(structureList);
} }

View File

@ -41,5 +41,7 @@ public interface DataSourceService extends IService<DataSource> {
Boolean testConnection(DataSource dataSource); Boolean testConnection(DataSource dataSource);
Integer syncAssetStructure(DataSource dataSource);
// void synchronous(DataSource dataSource); // void synchronous(DataSource dataSource);
} }

View File

@ -1,19 +1,25 @@
package com.muyu.source.service.Impl; package com.muyu.source.service.Impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Assert; import com.baomidou.mybatisplus.core.toolkit.Assert;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.utils.StringUtils; import com.muyu.common.core.utils.StringUtils;
import com.muyu.common.security.utils.SecurityUtils;
import com.muyu.source.domain.*; import com.muyu.source.domain.*;
import com.muyu.source.mapper.DataSourceMapper; import com.muyu.source.mapper.DataSourceMapper;
import com.muyu.source.pool.MysqlPool;
import com.muyu.source.service.*; import com.muyu.source.service.*;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.*; import java.sql.*;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.muyu.source.pool.config.BaseConfig.SELECTALL;
import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.Executors.newFixedThreadPool;
/** /**
@ -35,8 +41,15 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
private ChildrenService childrenService; private ChildrenService childrenService;
@Resource @Resource
private TableDataService tableDataService; private TableDataService tableDataService;
@Resource
private TableInfoService tableInfoService;
@Resource
private StructureService structureService;
private final ExecutorService executor = newFixedThreadPool(10); private static final Long PARENTID =0L;
private static final String DRIVERNAME ="com.mysql.cj.jdbc.Driver";
private static final String MYSQL ="MySql";
private static final String REDIS ="redis";
/** /**
@ -132,218 +145,191 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
/** /**
* *
* * @param etlDataScore
* @param dataSource * @return
*/ */
// @Override @Override
// public void synchronous(DataSource dataSource) { public Integer syncAssetStructure(DataSource etlDataScore) {
// // 获取指定ID的资产数据源对象 try {
// AssetDataSource dataSourceServiceOne = assetDataSourceService.getOne(new LambdaQueryWrapper<>() {{ if (etlDataScore.getDataType().equals(MYSQL)) {
// eq(AssetDataSource::getId, dataSource.getId()); MysqlPool mysqlPool = new MysqlPool(etlDataScore);
// }}); mysqlPool.init();
// //如果存在则进行删除 Connection conn = mysqlPool.getConn();
// if (StringUtils.isNotNull(dataSourceServiceOne)) {
// // 根据资产数据源ID查询所有的表
// List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<>() {{
// eq(Children::getAssetId, dataSourceServiceOne.getId());
// }});
//
// childrenList.forEach(children -> {
// //删除表结构中表的所有数据
// tableDataService.remove(new LambdaQueryWrapper<>() {{
// eq(TableData::getChildrenId, children.getId());
// }});
// });
// // 删除数据库结构中所有的数据
// childrenService.remove(new LambdaQueryWrapper<>() {{
// eq(Children::getAssetId, dataSourceServiceOne.getId());
// }});
// assetDataSourceService.remove(new LambdaQueryWrapper<>() {{
// eq(AssetDataSource::getId, dataSourceServiceOne.getId());
// }});
// }
// AssetDataSource build = AssetDataSource.builder()
// .id(dataSource.getId())
// .name(dataSource.getName())
// .systemName(dataSource.getSystemName())
// .databaseName(dataSource.getDatabaseName())
// .type("dataSource")
// .build();
// //添加资产数据源结构 库
// assetDataSourceService.save(build);
// //添加资产数据源下的所有表结构 表
// List<Children> childrenList = addChildren(build);
// //循环遍历添加表结构的数据 字段
// childrenList.forEach(children -> {
// addTable(build, children.getName());
// });
// }
//
// //同步数据库结构
// private List<Children> addChildren(AssetDataSource build) {
// //获取数据源
// DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{
// eq(DataSource::getName, build.getName());
// eq(DataSource::getDatabaseName, build.getDatabaseName());
// }});
// //获取数据源类型
// DataType dataType = dataTypeService.getOne(new LambdaQueryWrapper<>() {{
// eq(DataType::getType, dataSource.getDataType());
// }});
// //用于拼接jdbc连接数据库的路径
// String jdbcUrl = "";
// //用于拼接sql语句
// String sql = "";
// //判断数据库类型
// if ("MySql".equals(dataType.getType())) {
// //获取表名称以及表注释
// sql = "select TABLE_NAME,TABLE_COMMENT from INFORMATION_SCHEMA.Tables where table_schema =" + "'" + dataSource.getDatabaseName() + "'";
// }
// //拼接jdbc连接数据库的路径
// jdbcUrl = dataType.getPrefix() + dataSource.getIp() + ":" + dataSource.getPort() + "/" +
// dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam();
// try {
// //加载驱动
// Class.forName(dataType.getDriverManager());
// //获取连接
// Connection connection = DriverManager.getConnection(jdbcUrl, dataSource.getUserName(), dataSource.getPassword());
// //执行sql语句
// PreparedStatement preparedStatement = connection.prepareStatement(sql);
// //返回一个结果集 包含查询生成的数据
// ResultSet resultSet = preparedStatement.executeQuery();
// //遍历结果集
// while (resultSet.next()) {
// Children children = Children.builder()
// .name(resultSet.getString("TABLE_NAME"))
// .remark(resultSet.getString("TABLE_COMMENT"))
// .type("dataType")
// .isCenter("Y")
// .assetId(build.getId())
// .build();
// //添加到数据库中
// childrenService.save(children);
// }
// //关闭连接
// connection.close();
// resultSet.close();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// //获取数据库结构
// List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<>() {{
// eq(Children::getAssetId, build.getId());
// }});
// try {
// //加载驱动
// Class.forName(dataType.getDriverManager());
// //获取连接
// Connection connection = DriverManager.getConnection(jdbcUrl, dataSource.getUserName(), dataSource.getPassword());
// //创建一个Statement对象
// Statement statement = connection.createStatement();
// //遍历获取到的表结构集合
// childrenList.forEach(children -> {
// //查询指定数据库中指定表的记录数
// String sql1 = "SELECT COUNT(*) AS tableNum FROM " + "`" + dataSource.getDatabaseName() + "`" + "." + "`" + children.getName() + "`";
// try {
// //执行给定的sql语句
// ResultSet resultSet = statement.executeQuery(sql1);
// while (resultSet.next()) {
// //检索resultSet对象当前行中的指定列的值
// int anInt = resultSet.getInt("tableNum");
// children.setDataTotal(anInt);
// childrenService.updateById(children);
// }
// resultSet.close();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// return childrenList;
// }
//
// //同步表结构
// private void addTable(AssetDataSource build, String name) {
// // 查询数据源对象
// DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{
// eq(DataSource::getName, build.getName());
// eq(DataSource::getDatabaseName, build.getDatabaseName());
// }});
// //获取表结构对象
// Children serviceOne = childrenService.getOne(new LambdaQueryWrapper<>() {{
// eq(Children::getName, name);
// }});
// // 获取数据类型对象
// DataType dataType = dataTypeService.getOne(new LambdaQueryWrapper<>() {{
// eq(DataType::getType, dataSource.getDataType());
// }});
// // 用于拼接jdbc连接数据库的路径
// String jdbcUrl = "";
// jdbcUrl = dataType.getPrefix() + dataSource.getIp() + ":" + dataSource.getPort() + "/" +
// dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam();
// try {
// //加载驱动
// Class.forName(dataType.getDriverManager());
// //获取连接
// Connection connection = DriverManager.getConnection(jdbcUrl, dataSource.getUserName(), dataSource.getPassword());
// //获取数据库元数据
// DatabaseMetaData metaData = connection.getMetaData();
// //获取指定数据库中指定表的字段信息
// ResultSet columns = metaData.getColumns(dataSource.getDatabaseName(), dataSource.getDatabaseName(), name, "%");
// //获取指定数据库中指定表的主键信息
// ResultSet primaryKeys = metaData.getPrimaryKeys(dataSource.getDatabaseName(), dataSource.getDatabaseName(), name);
// //获取指定数据库中指定表的主键名称
// String primaryKey = "";
// //遍历主键信息
// while (primaryKeys.next()) {
// primaryKey = primaryKeys.getString("COLUMN_NAME");
// }
// //遍历字段信息
// while (columns.next()) {
// //获取字段名称
// String columnName = columns.getString("COLUMN_NAME");
// //获取字段备注
// String columnComment = columns.getString("REMARKS");
// //获取字段类型
// String columnType = columns.getString("TYPE_NAME");
// //获取字段长度
// int columnSize = columns.getInt("COLUMN_SIZE");
// //获取字段小数位数
// int decimalDigits = columns.getInt("DECIMAL_DIGITS");
// //判断字段是否为空
// String isNullable = columns.getString("IS_NULLABLE");
// //判断字段是否为默认值
// String columnDefault = columns.getString("COLUMN_DEF");
// // 添加字段
// TableData tableData = TableData.builder()
// .name(columnName)
// .comment(columnComment)
// .isPrimaryKey(columnName.equals(primaryKey) ? "Y" : "N")
// .type(columnType)
// .length(columnSize)
// .decimalPlaces(decimalDigits)
// .isNull(isNullable.equals("YES") ? "Y" : "N")
// .defaultValue(columnDefault)
// .isDict("N")
// .dictKey("")
// .childrenId(serviceOne.getId())
// .build();
// //添加字段信息到数据库中
// tableDataService.save(tableData);
// }
// //关闭资源
// columns.close();
// primaryKeys.close();
// connection.close();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
//
//
// }
TableInfo tableInfoInsert = TableInfo.builder()
.basicId(etlDataScore.getId())
.parentId(PARENTID)
.tableRemark("")
.center("Y")
.type("dataSource")
.tableName(etlDataScore.getDatabaseName())
.createBy(SecurityUtils.getUsername())
.createTime(new java.util.Date())
.build();
//添加数据库table_info表
tableInfoService.saveOrUpdate(tableInfoInsert, new LambdaUpdateWrapper<TableInfo>(TableInfo.class) {{
eq(TableInfo::getTableName, tableInfoInsert.getTableName());
eq(TableInfo::getBasicId, etlDataScore.getId());
}});
//根据数据库id和数据库名称或表名称查询
TableInfo tableInfo = tableInfoService.selectTableInfoByName(tableInfoInsert);
DatabaseMetaData metaData = conn.getMetaData();
ResultSet rs = metaData.getTables(etlDataScore.getDatabaseName(), null, "%", new String[]{"TABLE"});
while (rs.next()) {
String tableName = rs.getString("TABLE_NAME");
String tableRemark = rs.getString("REMARKS");
PreparedStatement preparedStatement = conn.prepareStatement(SELECTALL + tableName);
ResultSet resultSet = preparedStatement.executeQuery();
Long rowCount = 0L;
while (resultSet.next()) {
rowCount++;
}
TableInfo build = TableInfo.builder()
.basicId(etlDataScore.getId())
.tableName(tableName)
//bug点tableRemark为空造成空指针异常
.tableRemark(tableRemark == null ? "" : tableRemark)
.parentId(tableInfo.getId())
.type("dataTable")
.center("Y")
.updateBy(SecurityUtils.getUsername())
.dataNum(rowCount)
.updateTime(new Date())
.build();
tableInfoService.saveOrUpdate(build, new LambdaUpdateWrapper<>(TableInfo.class) {{
eq(TableInfo::getTableName, build.getTableName());
eq(TableInfo::getBasicId, etlDataScore.getId());
}});
//根据数据库id和表名查询数据
TableInfo table = tableInfoService.selectTableInfoByName(build);
ExecutorService threadPool = Executors.newCachedThreadPool();
threadPool.submit(() -> {
syncData(conn, etlDataScore.getDatabaseName(), table);
});
preparedStatement.close();
}
conn.close();
mysqlPool.closeConn();
}
// else if (etlDataScore.getType().equals(REDIS)) {
// RedisPool redisPool = new RedisPool(etlDataScore);
// redisPool.init();
// Jedis jedis = redisPool.getConn();
//
// String cursor = ScanParams.SCAN_POINTER_START;
// ScanParams scanParams = new ScanParams().count(100);
//
// HashMap<String, String> map = new HashMap<>();
//
//
// while (true) {
// ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
// List<String> keys = scanResult.getResult();
// for (String key : keys) {
// String value = jedis.get(key);
// if (value != null) {
// map.put(key, value);
// }
// }
// cursor = scanResult.getCursor();
// if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
// break;
// }
// }
// System.out.println(map);
//
// redisPool.replease(jedis);
// redisPool.closeConn();
//
//
// }
} catch (SQLException e) {
throw new RuntimeException(e);
}
return 1;
}
private void syncData(Connection conn, String databaseName, TableInfo table) {
ExecutorService threadPool = Executors.newCachedThreadPool();
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(" SELECT " +
" COLUMN_NAME , " +
" COLUMN_COMMENT ," +
" CASE WHEN COLUMN_KEY = 'PRI' THEN '是' ELSE '否' END ," +
" CASE \n" +
" WHEN DATA_TYPE = 'int' THEN 'Integer' " +
" WHEN DATA_TYPE = 'bigint' THEN 'Long' " +
" WHEN DATA_TYPE = 'varchar' THEN 'String' " +
" WHEN DATA_TYPE = 'decimal' THEN 'BigDecimal' " +
" WHEN DATA_TYPE = 'tinyint' AND COLUMN_TYPE = 'tinyint(1)' THEN 'Boolean'" +
" ELSE DATA_TYPE \n" +
" END , " +
" DATA_TYPE , \n" +
" COLUMN_TYPE , \n" +
" CHARACTER_MAXIMUM_LENGTH , \n" +
" NUMERIC_SCALE , \n" +
" IS_NULLABLE , \n" +
" COLUMN_DEFAULT \n" +
"FROM INFORMATION_SCHEMA.COLUMNS WHERE \n" +
"TABLE_SCHEMA = '" + databaseName + "' \n" +
"AND TABLE_NAME = '" + table.getTableName() + "'");
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
String columnName = String.valueOf(resultSet.getString(1));
String columnComment = String.valueOf(resultSet.getObject(2));
String columnKey = String.valueOf(resultSet.getObject(3));
String end = String.valueOf(resultSet.getObject(4));
String dataType = String.valueOf(resultSet.getObject(5));
String columnType = String.valueOf(resultSet.getObject(6));
String characterMaximumLength = String.valueOf(resultSet.getInt(7));
String NumericScale = String.valueOf(resultSet.getInt(8));
String isNullable = String.valueOf(resultSet.getObject(9));
String columnDefault = String.valueOf(resultSet.getObject(10));
Structure build = Structure.builder()
.tableId(table.getId())
.columnName(String.valueOf(columnName))
.columnRemark(columnComment)
.isPrimary("是".equals(columnKey) ? "Y" : "N")
.javaType(end)
.columnType(columnType)
.columnLength(characterMaximumLength)
.columnDecimals(NumericScale)
.isNull("YES".equals(isNullable) ? "Y" : "N")
.defaultValue(columnDefault)
.build();
threadPool.submit(() -> {
structureService.saveOrUpdate(build, new LambdaUpdateWrapper<Structure>() {{
eq(Structure::getTableId, build.getTableId());
eq(Structure::getColumnName, build.getColumnName());
eq(Structure::getColumnRemark, build.getColumnRemark());
}});
});
}
threadPool.shutdown();
ps.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
/** /**
* *