fix():使用异步编排优化数据同步

ruoyi_test
sunshine7058 2024-04-29 22:30:37 +08:00
parent 36cb2598f3
commit 366c7b4893
1 changed files with 69 additions and 57 deletions

View File

@ -26,9 +26,11 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.regex.Matcher; import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -56,8 +58,9 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
private TableDataService tableDataService; private TableDataService tableDataService;
@Autowired @Autowired
private DatabaseTypeService databaseTypeService; private DatabaseTypeService databaseTypeService;
private final ExecutorService executor = Executors.newFixedThreadPool(10);
private static Integer version = 1; private static Integer version = 1;
/** /**
* *
* *
@ -110,7 +113,8 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
} }
if ("PostgreSql".equals(dataType.getDatabaseName())) { if ("PostgreSql".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager(); driveClass = dataType.getDriverManager();
jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/" + dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam(); jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/"
+ dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam();
} }
if ("oracle".equals(dataType.getDatabaseName())) { if ("oracle".equals(dataType.getDatabaseName())) {
driveClass = dataType.getDriverManager(); driveClass = dataType.getDriverManager();
@ -155,26 +159,24 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
*/ */
@Override @Override
public Result syncConnection(DataSource dataSource) { public Result syncConnection(DataSource dataSource) {
AssetDataSource assetDataSourceOne=assetDataSourceService.getOne(new LambdaQueryWrapper<>() {{ AssetDataSource assetDataSourceOne = assetDataSourceService.getOne(new LambdaQueryWrapper<>() {{
eq(AssetDataSource::getId, dataSource.getId()); eq(AssetDataSource::getId, dataSource.getId());
}}); }});
// 如果存在 删除 // 如果存在 删除
if (StringUtils.isNotNull(assetDataSourceOne)) { if (StringUtils.isNotNull(assetDataSourceOne)) {
List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<>() {{ List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, assetDataSourceOne.getId()); eq(Children::getAssetId, assetDataSourceOne.getId());
}}); }});
// 同步表结构 childrenList.forEach(children -> {
childrenList.forEach(children -> { tableDataService.remove(new LambdaQueryWrapper<>() {{
tableDataService.remove(new LambdaQueryWrapper<>() {{ eq(TableData::getChildrenId, children.getId());
eq(TableData::getChildrenId, children.getId()); }});
}}); });
});
childrenService.remove(new LambdaQueryWrapper<>() {{ childrenService.remove(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, assetDataSourceOne.getId()); eq(Children::getAssetId, assetDataSourceOne.getId());
}}); }});
assetDataSourceService.removeById(assetDataSourceOne.getId()); assetDataSourceService.removeById(assetDataSourceOne.getId());
} }
AssetDataSource assetDataSource = AssetDataSource.builder() AssetDataSource assetDataSource = AssetDataSource.builder()
.id((long) Math.toIntExact(dataSource.getId())) .id((long) Math.toIntExact(dataSource.getId()))
@ -184,13 +186,18 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
.type("dataSource") .type("dataSource")
.build(); .build();
assetDataSourceService.save(assetDataSource); assetDataSourceService.save(assetDataSource);
// 同步数据库结构
getChildrenList(assetDataSource); CompletableFuture<Result> future = CompletableFuture.supplyAsync(() -> {
List<Children> childrenList = childrenService.list(); // 同步数据库结构
// 同步表结构 List<Children> childrenList = getChildrenList(assetDataSource);
childrenList.forEach(children -> {
addTableData(assetDataSource, children.getName()); // 同步表结构
}); childrenList.forEach(children -> {
addTableData(assetDataSource, children.getName());
});
return Result.success();
},executor);
return Result.success(); return Result.success();
} }
@ -208,7 +215,9 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
return childrenList; return childrenList;
} }
public void getChildrenList(AssetDataSource assetDataSource) { public List<Children> getChildrenList(AssetDataSource assetDataSource) {
List<Children> childrenList = new ArrayList<>();
//查询数据源对象 //查询数据源对象
DataSource dataSource = this.getOne(new LambdaQueryWrapper<DataSource>() {{ DataSource dataSource = this.getOne(new LambdaQueryWrapper<DataSource>() {{
eq(DataSource::getName, assetDataSource.getName()); eq(DataSource::getName, assetDataSource.getName());
@ -226,10 +235,9 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
//数据库类型为mysql //数据库类型为mysql
if ("mysql".equals(dataSource.getDataType())) { if ("mysql".equals(dataSource.getDataType())) {
// 根据id查询表描述 // 根据id查询表描述
List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<Children>() {{
eq(Children::getAssetId, assetDataSource.getId()); sql = "select TABLE_NAME,TABLE_COMMENT from INFORMATION_SCHEMA.Tables where table_schema = " + "'"
}}); + dataSource.getDatabaseName() + "'";
sql = "select TABLE_NAME,TABLE_COMMENT from INFORMATION_SCHEMA.Tables where table_schema = " + "'" + dataSource.getDatabaseName() + "'";
} else if ("PostgreSql".equals(dataSource.getDataType())) { } else if ("PostgreSql".equals(dataSource.getDataType())) {
sql = "select tb.table_name AS TABLE_NAME, d.description AS TABLE_COMMENT\n" + sql = "select tb.table_name AS TABLE_NAME, d.description AS TABLE_COMMENT\n" +
"from information_schema.tables tb\n" + "from information_schema.tables tb\n" +
@ -260,7 +268,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
List<Children> childrenList = childrenService.list(new LambdaQueryWrapper<>() {{ childrenList = childrenService.list(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, assetDataSource.getId()); eq(Children::getAssetId, assetDataSource.getId());
}}); }});
try { try {
@ -274,46 +282,47 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
while (rs.next()) { while (rs.next()) {
int tableNum = rs.getInt("tableNum"); int tableNum = rs.getInt("tableNum");
children.setDataTotal(Long.valueOf(tableNum)); children.setDataTotal(Long.valueOf(tableNum));
childrenService.updateById(children);; childrenService.updateById(children);
;
} }
rs.close(); rs.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
childrenList = childrenService.list(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, assetDataSource.getId());
}});
return childrenList;
} }
public void addTableData(AssetDataSource assetDataSource, String tableName) { public void addTableData(AssetDataSource assetDataSource, String tableName) {
// if (version == 1){ // if (version == 1){
DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{ DataSource dataSource = this.getOne(new LambdaQueryWrapper<>() {{
eq(DataSource::getName, assetDataSource.getName()); eq(DataSource::getName, assetDataSource.getName());
eq(DataSource::getDatabaseName, assetDataSource.getDatabaseName()); eq(DataSource::getDatabaseName, assetDataSource.getDatabaseName());
}}); }});
Children children = childrenService.getOne(new LambdaQueryWrapper<>() {{ Children children = childrenService.getOne(new LambdaQueryWrapper<>() {{
eq(Children::getAssetId, assetDataSource.getId()); eq(Children::getAssetId, assetDataSource.getId());
eq(Children::getName, tableName); eq(Children::getName, tableName);
}}); }});
// 获取数据类型对象 // 获取数据类型对象
DatabaseType dataType = databaseTypeService.getOne(new LambdaQueryWrapper<>() { DatabaseType dataType = databaseTypeService.getOne(new LambdaQueryWrapper<>() {
{ {
eq(DatabaseType::getDatabaseName, dataSource.getDataType()); eq(DatabaseType::getDatabaseName, dataSource.getDataType());
} }
}); });
List<TableData> tableDataList = tableDataService.list(new LambdaQueryWrapper<>() {{ List<TableData> tableDataList = tableDataService.list(new LambdaQueryWrapper<>() {{
eq(TableData::getChildrenId, children.getId()); eq(TableData::getChildrenId, children.getId());
}}); }});
String jdbcUrl = dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/" + dataSource.getDatabaseName() + "?" + dataSource.getConnectionParam(); String jdbcUrl =
dataType.getUrlPre() + dataSource.getHost() + ":" + dataSource.getPort() + "/" + dataSource.getDatabaseName()
+ "?" + dataSource.getConnectionParam();
Connection connection = null; Connection connection = null;
try { try {
Class.forName(dataType.getDriverManager()); Class.forName(dataType.getDriverManager());
@ -323,8 +332,10 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
} }
try { try {
DatabaseMetaData metaData = connection.getMetaData(); DatabaseMetaData metaData = connection.getMetaData();
ResultSet columnsRs = metaData.getColumns(dataSource.getDatabaseName(), dataSource.getDatabaseName(), tableName, "%"); ResultSet columnsRs = metaData.getColumns(dataSource.getDatabaseName(), dataSource.getDatabaseName(), tableName,
ResultSet primaryKeysRs = metaData.getPrimaryKeys(dataSource.getDatabaseName(), dataSource.getDatabaseName(), tableName); "%");
ResultSet primaryKeysRs = metaData.getPrimaryKeys(dataSource.getDatabaseName(), dataSource.getDatabaseName(),
tableName);
String primaryKeyColumnName = ""; String primaryKeyColumnName = "";
while (primaryKeysRs.next()) { while (primaryKeysRs.next()) {
primaryKeyColumnName = primaryKeysRs.getString("COLUMN_NAME"); primaryKeyColumnName = primaryKeysRs.getString("COLUMN_NAME");
@ -406,11 +417,12 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
} }
public static String getJavaType(String driveClass, String jdbcUrl, DataSource dataSource, String tableName, String columnName) { public static String getJavaType(String driveClass, String jdbcUrl, DataSource dataSource, String tableName,
String columnName) {
String sql = ""; String sql = "";
Connection connection = null; Connection connection = null;
String javaType = null; String javaType = null;
if ("MySql".equals(dataSource.getDataType())) { if ("mysql".equals(dataSource.getDataType())) {
sql = "select * from " + tableName + " where 2=1"; sql = "select * from " + tableName + " where 2=1";
} else if ("PostgreSql".equals(dataSource.getDataType())) { } else if ("PostgreSql".equals(dataSource.getDataType())) {
sql = "select * from " + dataSource.getDatabaseName() + "." + tableName + " where 2=1"; sql = "select * from " + dataSource.getDatabaseName() + "." + tableName + " where 2=1";