feat:数据资产客户端新增数据库连接池服务类

dev
gtl 2024-05-15 19:50:47 +08:00
parent c33a79599c
commit 5c13d4ba74
10 changed files with 201 additions and 87 deletions

View File

@ -1,18 +1,16 @@
package com.ruoyi.dataAsset.config; package com.ruoyi.dataAsset.config;
import com.alibaba.druid.pool.DruidDataSource;
import com.ruoyi.common.core.domain.Result; import com.ruoyi.common.core.domain.Result;
import com.ruoyi.common.core.web.page.TableDataInfo; import com.ruoyi.common.core.web.page.TableDataInfo;
import com.ruoyi.dataAsset.domain.DataSource; import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.domain.req.DataSourceQueryReq; import com.ruoyi.dataAsset.domain.req.DataSourceQueryReq;
import com.ruoyi.dataAsset.remote.RemoteDataAssetService; import com.ruoyi.dataAsset.remote.RemoteDataAssetService;
import com.ruoyi.dataAsset.util.DataSourceUtil; import com.ruoyi.dataAsset.service.ConnectionPoolService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* *
@ -27,28 +25,18 @@ public class DataAssetApplicationRunner implements ApplicationRunner {
private RemoteDataAssetService remoteDataAssetService; private RemoteDataAssetService remoteDataAssetService;
@Autowired @Autowired
private DruidDataSourceFactory druidDataSourceFactory; private ConnectionPoolService connectionPoolService;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) {
Result<TableDataInfo<DataSource>> result = remoteDataAssetService.list(new DataSourceQueryReq()); Result<TableDataInfo<DataSource>> result = remoteDataAssetService.list(new DataSourceQueryReq());
if(Result.isSuccess(result)){ if(Result.isSuccess(result)){
this.druidDataSourceMap(result.getData().getRows()); // 过滤无效数据
List<DataSource> dataSourceList = result.getData().getRows()
.stream().filter(dataSource -> dataSource.getStatus().equals("Y")).toList();
// 初始化连接池
connectionPoolService.init(dataSourceList);
} }
} }
public void druidDataSourceMap(List<DataSource> dataSourceList){
for (DataSource dataSource : dataSourceList) {
if(dataSource.getStatus().equals("N")){
continue;
}
try {
druidDataSourceFactory.put(dataSource.getId(), DataSourceUtil.createDataSource(dataSource));
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
};
} }

View File

@ -0,0 +1,16 @@
package com.ruoyi.dataAsset.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
/**
*
* @ClassName DataAssetClientConfig
* @Author
* @Date 2024/5/8 20:30
*/
@ComponentScan
@Import(DataAssetApplicationRunner.class)
public class DataAssetClientConfig {
}

View File

@ -1,32 +0,0 @@
package com.ruoyi.dataAsset.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import java.util.HashMap;
import java.util.Map;
/**
*
* @ClassName DataAssetClientConfig
* @Author
* @Date 2024/5/8 20:30
*/
@ComponentScan
@Import(DataAssetApplicationRunner.class)
public class DruidDataSourceFactory {
private final Map<Long, DruidDataSource> druidDataSourceMap=new HashMap<>();
public Map<Long,DruidDataSource> getMap(){
return this.druidDataSourceMap;
}
public void put(Long id,DruidDataSource druidDataSource){
this.druidDataSourceMap.put(id,druidDataSource);
}
public DruidDataSource get(Long id){
return this.druidDataSourceMap.get(id);
}
}

View File

@ -0,0 +1,49 @@
package com.ruoyi.dataAsset.context;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @ClassName ConnectionPoolContextHolder
* @Author GuanTieLin
* @Date 2024/5/15 16:14
*/
@Component
public class ConnectionPoolContextHolder {
/**
*
*/
private final Map<Long, DruidDataSource> druidDataSourceMap=new ConcurrentHashMap<>(16);
/**
*
*/
public Map<Long,DruidDataSource> getMap(){
return this.druidDataSourceMap;
}
/**
*
*/
public void put(Long id,DruidDataSource druidDataSource){
this.druidDataSourceMap.put(id,druidDataSource);
}
/**
*
*/
public DruidDataSource get(Long id){
return this.druidDataSourceMap.get(id);
}
/**
* key
*/
public boolean hasKey(Long id){
return this.druidDataSourceMap.containsKey(id);
}
}

View File

@ -0,0 +1,105 @@
package com.ruoyi.dataAsset.service;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.ruoyi.dataAsset.context.ConnectionPoolContextHolder;
import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.util.DataSourceUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
/**
*
* @ClassName ConnectionPoolService
* @Author GuanTieLin
* @Date 2024/5/15 16:25
*/
@Log4j2
@Component
public class ConnectionPoolService {
@Autowired
private ConnectionPoolContextHolder connectionPoolContextHolder;
/**
*
* @param dataSourceList
*/
public void init(List<DataSource> dataSourceList){
for (DataSource dataSource : dataSourceList) {
try {
connectionPoolContextHolder.put(dataSource.getId(), createDataSource(dataSource));
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
}
/**
*
* @param dataSource
* @return druid
*/
public DruidDataSource createDataSource(DataSource dataSource) {
try (Connection connection = DataSourceUtil.getConnection(dataSource)) {
} catch (SQLException e) {
throw new RuntimeException(e);
}
return DataSourceUtil.createDataSource(dataSource);
}
/**
*
* @param id
*/
public void closeDataSource(Long id){
// 判断键是否存在
if(hasKey(id)){
// 获取数据源
DruidDataSource druidDataSource = connectionPoolContextHolder.get(id);
// 关闭数据源
druidDataSource.close();
}
}
/**
*
* @param dataSource
*/
public void put(DataSource dataSource){
connectionPoolContextHolder.put(dataSource.getId(), createDataSource(dataSource));
}
/**
*
* @param id
* @return druid
* @throws SQLException sql
*/
public DruidPooledConnection getConnection(Long id) throws SQLException {
DruidPooledConnection druidPooledConnection=null;
// 判断键是否存在
if(hasKey(id)){
// 获取数据源
DruidDataSource druidDataSource = connectionPoolContextHolder.get(id);
// 获取链接
druidPooledConnection=druidDataSource.getConnection();
}else {
log.error("数据库连接池中不存在key:[{}]",id);
}
return druidPooledConnection;
}
/**
* key
* @param id
* @return
*/
public boolean hasKey(Long id){
return connectionPoolContextHolder.hasKey(id);
}
}

View File

@ -1 +1 @@
com.ruoyi.dataAsset.config.DruidDataSourceFactory com.ruoyi.dataAsset.config.DataAssetClientConfig

View File

@ -5,27 +5,23 @@ import java.util.*;
import java.util.Date; import java.util.Date;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection;
import com.ruoyi.common.core.exception.ServiceException; import com.ruoyi.common.core.exception.ServiceException;
import com.ruoyi.common.core.utils.ObjUtils; import com.ruoyi.common.core.utils.ObjUtils;
import com.ruoyi.common.core.utils.StringUtils; import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.security.utils.SecurityUtils; import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.dataAsset.config.DruidDataSourceFactory;
import com.ruoyi.dataAsset.constant.QueueNameConstants; import com.ruoyi.dataAsset.constant.QueueNameConstants;
import com.ruoyi.dataAsset.domain.AssetModelData; import com.ruoyi.dataAsset.domain.AssetModelData;
import com.ruoyi.dataAsset.domain.ColumnInfo; import com.ruoyi.dataAsset.domain.ColumnInfo;
import com.ruoyi.dataAsset.domain.TableInfo; import com.ruoyi.dataAsset.domain.TableInfo;
import com.ruoyi.dataAsset.queue.NormalQueue; import com.ruoyi.dataAsset.queue.NormalQueue;
import com.ruoyi.dataAsset.service.AssetModelDataService; import com.ruoyi.dataAsset.service.*;
import com.ruoyi.dataAsset.service.ColumnInfoService;
import com.ruoyi.dataAsset.service.TableInfoService;
import com.ruoyi.dataAsset.util.DataSourceUtil; import com.ruoyi.dataAsset.util.DataSourceUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.ruoyi.dataAsset.mapper.DataSourceMapper; import com.ruoyi.dataAsset.mapper.DataSourceMapper;
import com.ruoyi.dataAsset.domain.DataSource; import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.service.DataSourceService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -50,7 +46,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
private AssetModelDataService assetModuleDataService; private AssetModelDataService assetModuleDataService;
@Autowired @Autowired
private DruidDataSourceFactory druidDataSourceFactory; private ConnectionPoolService connectionPoolService;
@Autowired @Autowired
private NormalQueue normalQueue; private NormalQueue normalQueue;
@ -146,21 +142,22 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
"INNER JOIN sysindexes AS b ON a.id = b.id WHERE ( a.type = 'u' ) AND ( b.indid IN ( 0, 1 ) );"; "INNER JOIN sysindexes AS b ON a.id = b.id WHERE ( a.type = 'u' ) AND ( b.indid IN ( 0, 1 ) );";
} }
try { try {
DruidDataSource druidDatasource = druidDataSourceFactory.getMap().get(dataSourceId); DruidPooledConnection connection = connectionPoolService.getConnection(dataSourceId);
//总的记录数,总的表数量 //总的记录数,总的表数量
ResultSet result = druidDatasource.getConnection().prepareStatement(sql).executeQuery(); ResultSet result = connection.prepareStatement(sql).executeQuery();
while (result.next()){ while (result.next()){
dataSource.setTableTotal(result.getInt("tableTotal")); dataSource.setTableTotal(result.getInt("tableTotal"));
dataSource.setRecordsTotal(result.getInt("recordsTotal")); dataSource.setRecordsTotal(result.getInt("recordsTotal"));
} }
//异步同步表结构并等待所有任务结束 //异步同步表结构并等待所有任务结束
CompletableFuture.supplyAsync(() -> { CompletableFuture.supplyAsync(() -> {
this.SynchronousTableStructure(druidDatasource, dataSource); this.SynchronousTableStructure(dataSource);
return "end"; return "end";
}).get(); }).get();
//修改数据 //修改数据
this.updateById(dataSource); this.updateById(dataSource);
result.close(); result.close();
connection.close();
} catch (SQLException | InterruptedException | ExecutionException e) { } catch (SQLException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -168,13 +165,12 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
/** /**
* *
* @param druidDatasource
* @param dataSource * @param dataSource
*/ */
public void SynchronousTableStructure(DruidDataSource druidDatasource,DataSource dataSource){ public void SynchronousTableStructure(DataSource dataSource){
try { try {
List<CompletableFuture<String>> runAsyncList=new ArrayList<>(); List<CompletableFuture<String>> runAsyncList=new ArrayList<>();
Connection connection = druidDatasource.getConnection(); DruidPooledConnection connection = connectionPoolService.getConnection(dataSource.getId());
ResultSet tables = connection.getMetaData().getTables(connection.getCatalog(), "dbo", "%", new String[] { "TABLE" }); ResultSet tables = connection.getMetaData().getTables(connection.getCatalog(), "dbo", "%", new String[] { "TABLE" });
while (tables.next()) { while (tables.next()) {
String tableName = tables.getString("TABLE_NAME"); String tableName = tables.getString("TABLE_NAME");
@ -201,7 +197,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
tableInfoService.save(tableInfo); tableInfoService.save(tableInfo);
//异步存储表数据 //异步存储表数据
runAsyncList.add(CompletableFuture.supplyAsync(() -> { runAsyncList.add(CompletableFuture.supplyAsync(() -> {
this.SynchronousColumnInfo(druidDatasource, tableName, tableInfo.getId(), dataSource); this.SynchronousColumnInfo(tableName, tableInfo.getId(), dataSource);
return "end"; return "end";
})); }));
} }
@ -216,17 +212,16 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
/** /**
* *
* @param druidDatasource
* @param tableName * @param tableName
* @param tableId * @param tableId
*/ */
public void SynchronousColumnInfo(DruidDataSource druidDatasource,String tableName,Long tableId,DataSource dataSource) { public void SynchronousColumnInfo(String tableName,Long tableId,DataSource dataSource) {
//声明字段信息空集合 //声明字段信息空集合
List<ColumnInfo> list=new ArrayList<>(); List<ColumnInfo> list=new ArrayList<>();
//声明资产模型数据空集合 //声明资产模型数据空集合
List<AssetModelData> moduleDataList=new ArrayList<>(); List<AssetModelData> moduleDataList=new ArrayList<>();
try { try {
Connection connection = druidDatasource.getConnection(); DruidPooledConnection connection = connectionPoolService.getConnection(dataSource.getId());
ResultSet columns = connection.getMetaData().getColumns(connection.getCatalog(), null, tableName, "%"); ResultSet columns = connection.getMetaData().getColumns(connection.getCatalog(), null, tableName, "%");
ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), null, tableName); ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), null, tableName);
String primaryKeyName=null; String primaryKeyName=null;
@ -325,20 +320,13 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
public void renewalDataSource(DataSource dataSource){ public void renewalDataSource(DataSource dataSource){
// 发送消息 // 发送消息
normalQueue.sendUUIDMsg(QueueNameConstants.DATASOURCE_MESSAGE,dataSource); normalQueue.sendUUIDMsg(QueueNameConstants.DATASOURCE_MESSAGE,dataSource);
DruidDataSource druidDataSource = DataSourceUtil.createDataSource(dataSource);
// 新增或替换数据源
Long id = dataSource.getId(); Long id = dataSource.getId();
if(druidDataSourceFactory.getMap().containsKey(id)){ // 如果已经存在
// 获取旧的 if(connectionPoolService.hasKey(id)){
DruidDataSource source = druidDataSourceFactory.get(id);
// 替换
druidDataSourceFactory.put(id,druidDataSource);
// 关闭旧的 // 关闭旧的
source.close(); connectionPoolService.closeDataSource(id);
}else {
//新增
druidDataSourceFactory.put(id, druidDataSource);
} }
// 新增或替换数据源
connectionPoolService.put(dataSource);
} }
} }

View File

@ -3,7 +3,7 @@ package com.ruoyi.dataTransform.consumer;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.ruoyi.dataAsset.config.DruidDataSourceFactory; import com.ruoyi.dataAsset.config.DataAssetClientConfig;
import com.ruoyi.dataAsset.constant.QueueNameConstants; import com.ruoyi.dataAsset.constant.QueueNameConstants;
import com.ruoyi.dataAsset.domain.DataSource; import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.util.DataSourceUtil; import com.ruoyi.dataAsset.util.DataSourceUtil;
@ -32,7 +32,7 @@ public class DataSourceConsumer {
private RedisTemplate<String, String> redisTemplate; private RedisTemplate<String, String> redisTemplate;
@Autowired @Autowired
private DruidDataSourceFactory druidDataSourceFactory; private DataAssetClientConfig druidDataSourceFactory;
@Autowired @Autowired
private MessageConverter messageConverter; private MessageConverter messageConverter;

View File

@ -1,7 +1,7 @@
package com.ruoyi.dataTransform.service.impl; package com.ruoyi.dataTransform.service.impl;
import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.pool.DruidPooledConnection;
import com.ruoyi.dataAsset.config.DruidDataSourceFactory; import com.ruoyi.dataAsset.service.ConnectionPoolService;
import com.ruoyi.dataTransform.domain.req.RandomDataReq; import com.ruoyi.dataTransform.domain.req.RandomDataReq;
import com.ruoyi.dataTransform.service.DataExtractService; import com.ruoyi.dataTransform.service.DataExtractService;
import com.ruoyi.dataTransform.util.DataExtractUtil; import com.ruoyi.dataTransform.util.DataExtractUtil;
@ -24,7 +24,7 @@ import java.util.List;
public class DataExtractServiceImpl implements DataExtractService { public class DataExtractServiceImpl implements DataExtractService {
@Autowired @Autowired
private DruidDataSourceFactory druidDataSourceFactory; private ConnectionPoolService connectionPoolService;
@Override @Override
public List<List<DataModel>> getRandomData(RandomDataReq randomDataReq) { public List<List<DataModel>> getRandomData(RandomDataReq randomDataReq) {
@ -32,7 +32,7 @@ public class DataExtractServiceImpl implements DataExtractService {
List<List<DataModel>> list=new ArrayList<>(); List<List<DataModel>> list=new ArrayList<>();
try { try {
// 获取数据源连接 // 获取数据源连接
connection = druidDataSourceFactory.get(randomDataReq.getDataSourceId()).getConnection(); connection = connectionPoolService.getConnection(randomDataReq.getDataSourceId());
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }