fix():重构数据同步方法

ruoyi_test
sunshine7058 2024-04-26 21:28:56 +08:00
parent b9a19fbdd9
commit 54817d5629
2 changed files with 362 additions and 292 deletions

View File

@ -6,6 +6,7 @@ import com.muyu.common.security.annotation.EnableMyFeignClients;
import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
*
@ -16,6 +17,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableCustomSwagger2
@EnableMyFeignClients
@SpringBootApplication
public class MuYuDataSourceApplication {
public static void main (String[] args) {
SpringApplication.run(MuYuDataSourceApplication.class, args);

View File

@ -1,34 +1,40 @@
package com.muyu.data.source.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.utils.ObjUtils;
import com.muyu.data.source.domain.AssetDataSource;
import com.muyu.data.source.domain.Children;
import com.muyu.data.source.domain.DataSource;
import com.muyu.data.source.domain.DatabaseType;
import com.muyu.data.source.domain.TableData;
import com.muyu.data.source.domain.req.ShowTableReq;
import com.muyu.data.source.domain.resp.ColumnResp;
import com.muyu.data.source.domain.resp.CountResp;
import com.muyu.data.source.domain.resp.Table;
import com.muyu.data.source.mapper.DataSourceMapper;
import com.muyu.data.source.service.AssetDataSourceService;
import com.muyu.data.source.service.ChildrenService;
import com.muyu.data.source.service.DataSourceService;
import com.muyu.data.source.service.DatabaseTypeService;
import com.muyu.data.source.service.TableDataService;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import com.muyu.common.core.utils.ObjUtils;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.muyu.data.source.mapper.DataSourceMapper;
import com.muyu.data.source.domain.DataSource;
import com.muyu.data.source.service.DataSourceService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import redis.clients.jedis.Jedis;
/**
@ -39,315 +45,377 @@ import redis.clients.jedis.Jedis;
*/
@Slf4j
@Service
public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSource> implements DataSourceService {
public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSource> implements DataSourceService {
@Autowired
private DataSourceMapper dataSourceMapper;
@Autowired
private AssetDataSourceService assetDataSourceService;
@Autowired
private DataSourceMapper dataSourceMapper;
@Autowired
private AssetDataSourceService assetDataSourceService;
@Autowired
private ChildrenService childrenService;
@Autowired
private ChildrenService childrenService;
@Autowired
private TableDataService tableDataService;
@Autowired
private DatabaseTypeService databaseTypeService;
@Autowired
private TableDataService tableDataService;
@Autowired
private DatabaseTypeService databaseTypeService;
/**
*
*
* @param dataSource
* @return
*/
@Override
public List<DataSource> list(DataSource dataSource) {
LambdaQueryWrapper<DataSource> queryWrapper = new LambdaQueryWrapper<>();
private static Integer version = 1;
/**
*
*
* @param dataSource
* @return
*/
@Override
public List<DataSource> list(DataSource dataSource) {
LambdaQueryWrapper<DataSource> queryWrapper = new LambdaQueryWrapper<>();
if (ObjUtils.notNull(dataSource.getName())) {
queryWrapper.like(DataSource::getName, dataSource.getName());
}
if (ObjUtils.notNull(dataSource.getSystemName())) {
queryWrapper.like(DataSource::getSystemName, dataSource.getSystemName());
}
if (ObjUtils.notNull(dataSource.getDatabaseName())) {
queryWrapper.like(DataSource::getDatabaseName, dataSource.getDatabaseName());
}
return list(queryWrapper);
if (ObjUtils.notNull(dataSource.getName())) {
queryWrapper.like(DataSource::getName, dataSource.getName());
}
/**
*
*
* @param
*/
@Override
public Boolean testConnection(DataSource dataSource) {
DatabaseType dataType = databaseTypeService.getOne(new LambdaQueryWrapper<>() {{
eq(DatabaseType::getDatabaseName, dataSource.getDataType());
}});
if (ObjUtils.notNull(dataSource.getSystemName())) {
queryWrapper.like(DataSource::getSystemName, dataSource.getSystemName());
}
String jdbcUrl = "";
String driveClass = "";
boolean flag = false;
if (dataType.getDatabaseName().equals("mysql") || dataType.equals("MYSQL")) {
driveClass = dataType.getDriverManager();
if (ObjUtils.notNull(dataSource.getDatabaseName())) {
queryWrapper.like(DataSource::getDatabaseName, dataSource.getDatabaseName());
}
return list(queryWrapper);
}
/**
*
*
* @param
*/
@Override
public Boolean testConnection(DataSource dataSource) {
DatabaseType dataType = databaseTypeService.getOne(new LambdaQueryWrapper<>() {{
eq(DatabaseType::getDatabaseName, dataSource.getDataType());
}});
String jdbcUrl = "";
String driveClass = "";
boolean flag = false;
if (dataType.getDatabaseName().equals("mysql") || dataType.equals("MYSQL")) {
driveClass = dataType.getDriverManager();
}
try {
if (dataType.getDriverManager() != null && dataType.getUrlPre() != null) {
if ("mysql".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/"
+ dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam();
}
if ("oracle".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + ":"
+ dataSource.getDatabaseName();
}
if ("sqlserver".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager();
jdbcUrl =
dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + ";databaseName="
+ dataSource.getDatabaseName();
}
flag = testDatasource(driveClass, jdbcUrl, dataSource.getUser(), dataSource.getPassword());
} else if (dataType.getDriverManager() == null && dataType.getUrlPre() == null) {
// redis
//连接指定的redis
Jedis jedis = new Jedis(dataSource.getHost(), Integer.valueOf(dataSource.getPort()));
//如果有密码则需要下面这一行
if (dataSource.getPassword() != null && !dataSource.getPassword().equals("")) {
jedis.auth(dataSource.getPassword());
}
//查看服务是否运行,运行正常的话返回一个PONG否则返回一个连接错误
try {
if (dataType.getDriverManager() != null && dataType.getUrlPre() != null) {
if ("mysql".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/"
+ dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam();
}
if ("oracle".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + ":"
+ dataSource.getDatabaseName();
}
if ("sqlserver".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager();
jdbcUrl =
dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + ";databaseName="
+ dataSource.getDatabaseName();
}
flag = testDatasource(driveClass, jdbcUrl, dataSource.getUser(), dataSource.getPassword());
} else if (dataType.getDriverManager() == null && dataType.getUrlPre() == null) {
// redis
//连接指定的redis
Jedis jedis = new Jedis(dataSource.getHost(), Integer.valueOf(dataSource.getPort()));
//如果有密码则需要下面这一行
if (dataSource.getPassword() != null && !dataSource.getPassword().equals("")) {
jedis.auth(dataSource.getPassword());
}
//查看服务是否运行,运行正常的话返回一个PONG否则返回一个连接错误
try {
jedis.ping();
flag = true;
} catch (Exception e) {
flag = false;
}
}
return flag;
jedis.ping();
flag = true;
} catch (Exception e) {
return flag;
flag = false;
}
}
return flag;
} catch (Exception e) {
return flag;
}
/**
*
*
* @param dataSource
* @return
*/
@Override
public Boolean syncConnection(DataSource dataSource) {
if ("mysql".equals(dataSource.getDataType())) {
AssetDataSource assetDataSource = AssetDataSource.builder()
.name(dataSource.getName())
.systemName(dataSource.getSystemName())
.databaseName(dataSource.getDatabaseName())
.type("dataSource")
.build();
return assetDataSourceService.save(assetDataSource);
}
return false;
}
/**
*
*
* @param dataSource
* @return
*/
@Override
public Boolean syncConnection(DataSource dataSource) {
if ("mysql".equals(dataSource.getDataType())) {
AssetDataSource assetDataSource = AssetDataSource.builder()
.name(dataSource.getName())
.systemName(dataSource.getSystemName())
.databaseName(dataSource.getDatabaseName())
.type("dataSource")
.build();
return assetDataSourceService.save(assetDataSource);
}
return false;
}
@Override
public List<AssetDataSource> getAssetList() {
List<AssetDataSource> assetDataSourceList = assetDataSourceService.list();
return assetDataSourceList;
}
@Override
public List<AssetDataSource> getAssetList() {
List<AssetDataSource> assetDataSourceList = assetDataSourceService.list();
return assetDataSourceList;
}
@Override
public List<Children> getChildrenList(AssetDataSource assetDataSource) {
//查询数据源对象
DataSource dataSource = this.getOne(new LambdaQueryWrapper<DataSource>() {{
eq(DataSource::getName, assetDataSource.getName());
eq(DataSource::getDatabaseName, assetDataSource.getDatabaseName());
}});
//数据库类型为mysql
if ("mysql".equals(dataSource.getDataType())) {
//根据id查询表描述
List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<Children>() {{
eq(Children::getAssetId, assetDataSource.getId());
}});
//获取表描述的集合
List<Table> tableList = dataSourceMapper.selectTable(assetDataSource.getDatabaseName());
//判断是否存在
if (childrenList == null || childrenList.size() == 0) {
tableList.forEach(table -> {
Children children = Children.builder()
.name(table.getTableName())
.annotation(table.getTableComment())
.dataTotal(table.getTableRows())
.type("dataTable")
.assetId(assetDataSource.getId())
.build();
//添加到数据库
childrenService.save(children);
});
}
//返回表描述
return childrenList;
}
return null;
}
@Override
public void addTbleDate(ShowTableReq showTableReq) {
// 查询数据源对象
DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{
eq(DataSource::getName, showTableReq.getAssetStructure().getName());
eq(DataSource::getDatabaseName, showTableReq.getAssetStructure().getDatabaseName());
}});
Children children = childrenService.getOne(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, showTableReq.getAssetStructure().getId());
eq(Children::getName, showTableReq.getTableName());
}});
// 获取数据类型对象
DatabaseType dataType = databaseTypeService.getOne(new LambdaQueryWrapper<>() {
{
eq(DatabaseType::getDatabaseName, dataSource.getDataType());
}
});
String jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/"
@Override
public List<Children> getChildrenList(AssetDataSource assetDataSource) {
//查询数据源对象
DataSource dataSource = this.getOne(new LambdaQueryWrapper<DataSource>() {{
eq(DataSource::getName, assetDataSource.getName());
eq(DataSource::getDatabaseName, assetDataSource.getDatabaseName());
}});
DatabaseType databaseType = databaseTypeService.getOne(new LambdaQueryWrapper<DatabaseType>() {{
eq(DatabaseType::getDatabaseName, dataSource.getDataType());
}});
//数据库类型为mysql
if ("mysql".equals(dataSource.getDataType())) {
// 根据id查询表描述
List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<Children>() {{
eq(Children::getAssetId, assetDataSource.getId());
}});
if (childrenList == null || childrenList.size() == 0) {
String jdbcUrl = databaseType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/"
+ dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam();
List<ColumnResp> columnRespList = dataSourceMapper.selectColumn(dataSource.getDatabaseName(),
showTableReq.getTableName());
List<TableData> tableDataList = tableDataService.list(new LambdaQueryWrapper<>() {{
eq(TableData::getChildrenId, children.getId());
}});
if (tableDataList == null || tableDataList.size() == 0) {
columnRespList.forEach(columnResp -> {
String javaType = getJavaType(dataType.getDriverManager(), jdbcUrl, dataSource.getUser(),
dataSource.getPassword(), showTableReq.getTableName(), columnResp.getColumnName());
TableData tableData = TableData.builder()
.name(columnResp.getColumnName())
.comment(columnResp.getColumnComment())
.isPrimaryKey(columnResp.getColumnKey().equals("PRI") ? "Y" : "N")
.type(columnResp.getColumnType())
.mappingType(javaType)
.length(columnResp.getLength())
.decimalPlaces(columnResp.getNumericScale())
.isNull(columnResp.getIsNullable().equals("YES") ? "Y" : "N")
.defaultValue(columnResp.getColumnDefault())
.isDict("N")
.dictKey(null)
.childrenId(children.getId())
.build();
tableDataService.save(tableData);
});
}
}
@Override
public CountResp selectTableDataCount() {
List<AssetDataSource> assetDataSourceList = assetDataSourceService.list();
long size = assetDataSourceList.size();
List<Children> childrenList = childrenService.list();
long size1 = childrenList.size();
long sum = childrenList.stream().mapToLong(Children::getDataTotal).sum();
CountResp countResp = CountResp.builder()
.assetStructureCount(size)
.assetStructureTableCount(size1)
.assetStructureTableDataCount(sum)
.build();
return countResp;
}
@Override
public List<TableData> selectTableData(Long id) {
List<TableData> tableDataList = tableDataService.list(new LambdaQueryWrapper<>() {{
eq(TableData::getChildrenId, id);
}});
return tableDataList;
}
@Override
public CountResp getTableDataCount(Long id) {
List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, id);
}});
long size = childrenList.size();
long sum = childrenList.stream().mapToLong(Children::getDataTotal).sum();
return CountResp.builder()
.assetStructureCount(0L)
.assetStructureTableCount(size)
.assetStructureTableDataCount(sum)
.build();
}
public static String getJavaType(String driveClass, String url, String username, String password, String tableName, String columnName) {
Connection connection = buildConnection(driveClass, url, username, password);
PreparedStatement pst = null;
String javaType = null;
try {
String sql = "select * from " + tableName;
pst = connection.prepareStatement(sql);
ResultSetMetaData rsd = pst.executeQuery().getMetaData();
for (int i = 1; i <= rsd.getColumnCount(); i++) {
if (rsd.getColumnName(i).equals(columnName)) {
if (rsd.getColumnClassName(i).equals("java.time.LocalDateTime")) {
javaType = "LocalDateTime";
} else {
javaType = rsd.getColumnClassName(i).replace("java.lang.", "");
}
}
}
Class.forName(databaseType.getDriverManager());
Connection conn = DriverManager.getConnection(jdbcUrl, dataSource.getUser(),
dataSource.getPassword());
DatabaseMetaData metaData = conn.getMetaData();
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery(
"select * from information_schema.tables where TABLE_SCHEMA = " +
"'" + dataSource.getDatabaseName() + "'");
ResultSet tableRet = metaData.getTables(conn.getCatalog(), "%", "%", new String[]{"TABLE"});
while (rs.next()) {
String tableName = rs.getString("TABLE_NAME");
String tableComment = rs.getString("TABLE_COMMENT");
;
Children children = Children.builder()
.name(tableName)
.annotation(tableComment)
.dataTotal(Long.valueOf(rs.getRow()))
.type("dataTable")
.build();
children.setAssetId(assetDataSource.getId());
childrenService.save(children);
}
conn.close();
rs.close();
tableRet.close();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
try {
pst.close();
pst = null;
} catch (SQLException e) {
throw new RuntimeException(e);
}
throw new RuntimeException(e);
}
return javaType;
}
public static Connection buildConnection(String driveClass, String url, String username, String password) {
try {
Class.forName(driveClass);
Connection connection = DriverManager.getConnection(url, username, password);
return connection;
} catch (Exception e) {
return null;
}
}
}
//
//获取表描述的集合
// List<Table> tableList = dataSourceMapper.selectTable(assetDataSource.getDatabaseName());
//判断是否存在
// if (childrenList == null || childrenList.size() == 0) {
// tableList.forEach(table -> {
// Children children = Children.builder()
// .name(table.getTableName())
// .annotation(table.getTableComment())
// .dataTotal(table.getTableRows())
// .type("dataTable")
// .assetId(assetDataSource.getId())
// .build();
// //添加到数据库
// childrenService.save(children);
// });
//返回表描述
return childrenList;
// 建立连接
public static boolean testDatasource(String driveClass, String url, String username, String password) {
try {
Class.forName(driveClass);
DriverManager.getConnection(url, username, password);
return true;
} catch (Exception e) {
return false;
}
}
return null;
}
@Override
public void addTbleDate(ShowTableReq showTableReq) {
// if (version == 1){
DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{
eq(DataSource::getName, showTableReq.getAssetStructure().getName());
eq(DataSource::getDatabaseName, showTableReq.getAssetStructure().getDatabaseName());
}});
Children children = childrenService.getOne(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, showTableReq.getAssetStructure().getId());
eq(Children::getName, showTableReq.getTableName());
}});
// 获取数据类型对象
DatabaseType dataType = databaseTypeService.getOne(new LambdaQueryWrapper<>() {
{
eq(DatabaseType::getDatabaseName, dataSource.getDataType());
}
});
String jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/"
+ dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam();
List<TableData> tableDataList = tableDataService.list(new LambdaQueryWrapper<>() {{
eq(TableData::getChildrenId, children.getId());
}});
if (tableDataList == null || tableDataList.size() == 0) {
try {
Class.forName(dataType.getDriverManager());
Connection connection = DriverManager.getConnection(jdbcUrl, dataSource.getUser(), dataSource.getPassword());
Statement statement = connection.createStatement();
String sql = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS where table_schema ='" + dataSource.getDatabaseName() + "' AND table_name = '" + showTableReq.getTableName() + "'";
ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
String columnComment = rs.getString("COLUMN_COMMENT");
String columnKey = rs.getString("COLUMN_KEY");
String columnType = rs.getString("DATA_TYPE");
String javaType = getJavaType(dataType.getDriverManager(), jdbcUrl, dataSource.getUser(), dataSource.getPassword(), showTableReq.getTableName(), columnName);
String type = rs.getString("COLUMN_TYPE");
int length = 0;
Pattern pattern = Pattern.compile("\\d+");
Matcher matcher = pattern.matcher(type);
if (matcher.find()) {
String numberString = matcher.group();
length = Integer.parseInt(numberString);
}
int numericScale = rs.getInt("NUMERIC_SCALE");
String isNullable = rs.getString("IS_NULLABLE");
String columnDefault = rs.getString("COLUMN_DEFAULT");
TableData tableData = TableData.builder()
.name(columnName)
.comment(columnComment)
.isPrimaryKey(columnKey.equals("PRI") ? "Y" : "N")
.type(columnType)
.mappingType(javaType)
.length(Long.valueOf(length))
.decimalPlaces(Long.valueOf(numericScale))
.isNull(isNullable.equals("YES") ? "Y" : "N")
.defaultValue(columnDefault)
.isDict("N")
.dictKey(null)
.childrenId(children.getId())
.build();
tableDataService.save(tableData);
}
connection.close();
rs.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
// version ++;
// }
}
}
@Override
public CountResp selectTableDataCount() {
List<AssetDataSource> assetDataSourceList = assetDataSourceService.list();
long size = assetDataSourceList.size();
List<Children> childrenList = childrenService.list();
long size1 = childrenList.size();
long sum = childrenList.stream().mapToLong(Children::getDataTotal).sum();
CountResp countResp = CountResp.builder()
.assetStructureCount(size)
.assetStructureTableCount(size1)
.assetStructureTableDataCount(sum)
.build();
return countResp;
}
@Override
public List<TableData> selectTableData(Long id) {
List<TableData> tableDataList = tableDataService.list(new LambdaQueryWrapper<>() {{
eq(TableData::getChildrenId, id);
}});
return tableDataList;
}
@Override
public CountResp getTableDataCount(Long id) {
List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, id);
}});
long size = childrenList.size();
long sum = childrenList.stream().mapToLong(Children::getDataTotal).sum();
return CountResp.builder()
.assetStructureCount(0L)
.assetStructureTableCount(size)
.assetStructureTableDataCount(sum)
.build();
}
public static String getJavaType(String driveClass, String url, String username, String password, String tableName,
String columnName) {
Connection connection = buildConnection(driveClass, url, username, password);
PreparedStatement pst = null;
String javaType = null;
try {
String sql = "select * from " + tableName;
pst = connection.prepareStatement(sql);
ResultSetMetaData rsd = pst.executeQuery().getMetaData();
for (int i = 1; i <= rsd.getColumnCount(); i++) {
if (rsd.getColumnName(i).equals(columnName)) {
if (rsd.getColumnClassName(i).equals("java.time.LocalDateTime")) {
javaType = "LocalDateTime";
} else {
javaType = rsd.getColumnClassName(i).replace("java.lang.", "");
}
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
try {
pst.close();
pst = null;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
return javaType;
}
public static Connection buildConnection(String driveClass, String url, String username, String password) {
try {
Class.forName(driveClass);
Connection connection = DriverManager.getConnection(url, username, password);
return connection;
} catch (Exception e) {
return null;
}
}
// 建立连接
public static boolean testDatasource(String driveClass, String url, String username, String password) {
try {
Class.forName(driveClass);
DriverManager.getConnection(url, username, password);
return true;
} catch (Exception e) {
return false;
}
}
}