feat:根据项目进度更新README.md文件内容
parent
f0e60f7ea0
commit
d84aa60bf2
44
README.md
44
README.md
|
@ -1,8 +1,28 @@
|
|||
## 系统模块
|
||||
# **gtl-ruoyi-server**
|
||||
|
||||
## **项目介绍**
|
||||
|
||||
### 简介
|
||||
|
||||
本项目是 **ETL** 项目,主要功能是对接入的 **数据源 **进行数据的 **抽取**(Extract) > **清洗**(Cleaning) > **转换**(Transform) > **装载**(Load) 。
|
||||
|
||||
### 背景
|
||||
|
||||
**数据**是现代企业的重要资产,是企业运用**科学管理**、**决策分析**的基础。在企业中,数据通常来自于多个不同的系统和部门。数据分散存储,较为单一,难以对企业的统一发展规划提供帮助,企业需要对这些数据进行提取筛选,打破信息孤岛的困境,分析出有效信息。让只懂一些,甚至不懂数据存储的企业人也能快速上手,通过一些技术去处理自有数据。
|
||||
|
||||
### 需求
|
||||
|
||||
把分散的数据源提取>清洗>转换>整合,形成一个完整的数据集,便于后续的分析和处理。
|
||||
|
||||
### 拓展
|
||||
|
||||
经由 **ETL**技术 处理过的数据,一般会去往 **BI**(商业智能)/**DM**(数据仓库) 进行分析或存储
|
||||
|
||||
## 项目模块
|
||||
|
||||
~~~
|
||||
com.ruoyi
|
||||
├── ruoyi-ui // 前端框架 [80]
|
||||
├── gtl-ruoyi-ui // 前端框架 [80]
|
||||
├── ruoyi-gateway // 网关模块 [8080]
|
||||
├── ruoyi-auth // 认证中心 [9200]
|
||||
├── ruoyi-common // 通用模块
|
||||
|
@ -26,3 +46,23 @@ com.ruoyi
|
|||
│ └── muyu-visual-monitor // 监控中心 [9100]
|
||||
├──pom.xml // 公共依赖
|
||||
~~~
|
||||
## **项目技术栈**
|
||||
|
||||
### SpringCloud & Alibaba/SpringBoot
|
||||
|
||||
因为 `SpringCloud`进入到了维护阶段,不会再有新的组件技术出现。`Spring Cloud Alibaba`是`Spring cloud`的子项目,阿里巴巴推广,并且API是中文。
|
||||
|
||||
### Mybatis Plus/Mybatis
|
||||
|
||||
`Mybatis`是免费优秀的开源持久层框架。`Mybatis-Plus`是一个`Mybatis`的增强工具,它在`Mybatis`的基础上做了增强,却不做改变。
|
||||
|
||||
### Redis
|
||||
|
||||
本地缓存多个服务之间数据无法共享,甚至是数据存量也不够大,不做考虑。而分布式存储,目前市面上使用比较多的是`memcached`和`redis`,但`memcached`支持的数据类型只有K/V,没有`redis`支持的数据类型多,也不支持持久化
|
||||
|
||||
### JDBC
|
||||
|
||||
使用`JDBC`是为了动态的根据数据库的已有数据源连接数据去获取数据源的连接,方便从多个数据库/多种数据库快速提取数据。
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ public class DataDictType extends BaseEntity {
|
|||
.dictName(dataDictTypeSaveReq.getDictName())
|
||||
.dictType(dataDictTypeSaveReq.getDictType())
|
||||
.dataSourceId(dataDictTypeSaveReq.getDataSourceId())
|
||||
.remark(dataDictTypeSaveReq.getRemark())
|
||||
.createBy(createBy.get())
|
||||
.createTime(new Date())
|
||||
.build();
|
||||
|
@ -77,6 +78,7 @@ public class DataDictType extends BaseEntity {
|
|||
.dictName(dataDictTypeEditReq.getDictName())
|
||||
.dictType(dataDictTypeEditReq.getDictType())
|
||||
.dataSourceId(dataDictTypeEditReq.getDataSourceId())
|
||||
.remark(dataDictTypeEditReq.getRemark())
|
||||
.updateBy(updateBy.get())
|
||||
.updateTime(new Date())
|
||||
.build();
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package com.ruoyi.dataAsset;
|
||||
|
||||
import com.ruoyi.common.security.annotation.EnableCustomConfig;
|
||||
import com.ruoyi.common.security.annotation.EnableMyFeignClients;
|
||||
import com.ruoyi.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
/**
|
||||
* etl服务模块
|
||||
*
|
||||
* @author gtl
|
||||
*/
|
||||
@EnableCustomConfig
|
||||
@EnableCustomSwagger2
|
||||
@EnableMyFeignClients
|
||||
@SpringBootApplication
|
||||
public class GtlDataAssetApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(GtlDataAssetApplication.class, args);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package com.ruoyi.dataAsset.service;
|
||||
|
||||
import com.ruoyi.dataAsset.domain.DataSource;
|
||||
import com.ruoyi.dataAsset.domain.TableInfo;
|
||||
import com.ruoyi.dataAsset.domain.req.ColumnInfoReq;
|
||||
import com.ruoyi.dataAsset.domain.resp.AssetStructureResp;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 资产结构service接口
|
||||
* @ClassName AssetStructureService
|
||||
* @Author 森静若林
|
||||
* @Date 2024/4/21 11:00
|
||||
*/
|
||||
public interface AssetStructureService {
|
||||
/**
|
||||
* 通过用户权限获取数据源数据
|
||||
*
|
||||
* @return 数据源集合
|
||||
*/
|
||||
List<DataSource> getDataSourceData();
|
||||
|
||||
/**
|
||||
* 通过数据源编号和用户权限获取资产结构数据
|
||||
* @param dataSourceId 数据源编号
|
||||
* @return 资产结构响应集合
|
||||
*/
|
||||
List<AssetStructureResp> getAssetStructureData(Long dataSourceId);
|
||||
|
||||
/**
|
||||
* 修改字段信息中的映射字典
|
||||
* @param columnInfoReq 字段信息
|
||||
*/
|
||||
void columnInfoEdit(ColumnInfoReq columnInfoReq);
|
||||
|
||||
/**
|
||||
* 获取单个表的字段信息
|
||||
* @param tableInfoId 表信息编号
|
||||
* @return 表的字段信息
|
||||
*/
|
||||
AssetStructureResp getTableData(Long tableInfoId);
|
||||
|
||||
/**
|
||||
* 通过数据源编号和用户权限获取资产数据
|
||||
* @param dataSourceId 数据源编号
|
||||
* @return 表信息集合
|
||||
*/
|
||||
List<TableInfo> getAssetData(Long dataSourceId);
|
||||
|
||||
}
|
|
@ -43,6 +43,11 @@ public class AssetStructureServiceImpl implements AssetStructureService {
|
|||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
/**
|
||||
* 通过用户权限获取数据源数据
|
||||
*
|
||||
* @return 数据源集合
|
||||
*/
|
||||
@Override
|
||||
public List<DataSource> getDataSourceData() {
|
||||
LambdaQueryWrapper<DataSource> queryWrapper = new LambdaQueryWrapper<DataSource>().eq(DataSource::getStatus, "Y");
|
||||
|
@ -91,6 +96,11 @@ public class AssetStructureServiceImpl implements AssetStructureService {
|
|||
return dataSourceService.list(queryWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过数据源编号和用户权限获取资产结构数据
|
||||
* @param dataSourceId 数据源编号
|
||||
* @return 资产结构响应集合
|
||||
*/
|
||||
@Override
|
||||
public List<AssetStructureResp> getAssetStructureData(Long dataSourceId) {
|
||||
DataSource dataSource = dataSourceService.getById(dataSourceId);
|
||||
|
@ -114,6 +124,10 @@ public class AssetStructureServiceImpl implements AssetStructureService {
|
|||
}).toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改字段信息中的映射字典
|
||||
* @param columnInfoReq 字段信息
|
||||
*/
|
||||
@Override
|
||||
public void columnInfoEdit(ColumnInfoReq columnInfoReq) {
|
||||
columnInfoService.update(new LambdaUpdateWrapper<ColumnInfo>()
|
||||
|
@ -133,6 +147,11 @@ public class AssetStructureServiceImpl implements AssetStructureService {
|
|||
.eq(ColumnInfo::getTableId, tableInfoId)));
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过数据源编号和用户权限获取资产数据
|
||||
* @param dataSourceId 数据源编号
|
||||
* @return 表信息集合
|
||||
*/
|
||||
@Override
|
||||
public List<TableInfo> getAssetData(Long dataSourceId) {
|
||||
DataSource dataSource = dataSourceService.getById(dataSourceId);
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package com.ruoyi.dataAsset.service.impl;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.ruoyi.common.core.utils.ObjUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
|
|
@ -72,6 +72,11 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
return list(queryWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试数据源连接
|
||||
* @param id 数据源id
|
||||
* @return 是/否
|
||||
*/
|
||||
@Override
|
||||
public boolean testConnect(Long id){
|
||||
boolean result=true;
|
||||
|
@ -79,7 +84,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
Connection connection=null;
|
||||
try {
|
||||
connection = this.getConnection(dataSource);
|
||||
//修改为可用
|
||||
//如果为不可用修改为可用
|
||||
if(dataSource.getStatus().equals("N")){
|
||||
dataSource.setStatus("Y");
|
||||
dataSource.setUpdateTime(new Date());
|
||||
|
@ -88,6 +93,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
}
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
//连接失败,如果为可用修改为不可用
|
||||
if(dataSource.getStatus().equals("Y")){
|
||||
dataSource.setStatus("N");
|
||||
dataSource.setUpdateTime(new Date());
|
||||
|
@ -99,6 +105,12 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取数据源的连接
|
||||
* @param dataSource 数据源
|
||||
* @return 数据源连接
|
||||
* @throws SQLException 异常
|
||||
*/
|
||||
@Override
|
||||
public Connection getConnection(DataSource dataSource) throws SQLException {
|
||||
String url="jdbc:"+dataSource.getDatabaseType()+dataSource.getIp()+":"+dataSource.getPort();
|
||||
|
@ -138,9 +150,9 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
}
|
||||
String sql=null;
|
||||
if(dataSource.getDatabaseType().contains("mysql")){
|
||||
sql="SELECT COUNT(*) tables,SUM(table_rows) recordsTotal FROM information_schema.tables WHERE table_schema = '"+dataSource.getDatabases()+"';";
|
||||
}else {
|
||||
sql="SELECT COUNT(*) tables,SUM(b.rows) recordsTotal FROM sysobjects AS a \n" +
|
||||
sql="SELECT COUNT(*) tableTotal,SUM(table_rows) recordsTotal FROM information_schema.tables WHERE table_schema = '"+dataSource.getDatabases()+"';";
|
||||
}else if(dataSource.getDatabaseType().contains("sqlserver")) {
|
||||
sql="SELECT COUNT(*) tableTotal,SUM(b.rows) recordsTotal FROM sysobjects AS a \n" +
|
||||
"INNER JOIN sysindexes AS b ON a.id = b.id WHERE ( a.type = 'u' ) AND ( b.indid IN ( 0, 1 ) );";
|
||||
}
|
||||
Connection connection=null;
|
||||
|
@ -149,10 +161,13 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
//总的记录数,总的表数量
|
||||
ResultSet result = connection.prepareStatement(sql).executeQuery();
|
||||
while (result.next()){
|
||||
dataSource.setTableTotal(result.getInt("tables"));
|
||||
dataSource.setTableTotal(result.getInt("tableTotal"));
|
||||
dataSource.setRecordsTotal(result.getInt("recordsTotal"));
|
||||
}
|
||||
this.SynchronousTableStructure(dataSource,dataSourceId);
|
||||
//异步同步表结构
|
||||
CompletableFuture.runAsync(()->{
|
||||
this.SynchronousTableStructure(dataSource);
|
||||
});
|
||||
//修改数据
|
||||
this.updateById(dataSource);
|
||||
result.close();
|
||||
|
@ -162,7 +177,11 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
}
|
||||
}
|
||||
|
||||
public void SynchronousTableStructure(DataSource dataSource,Long dataSourceId){
|
||||
/**
|
||||
* 同步表结构
|
||||
* @param dataSource 数据源
|
||||
*/
|
||||
public void SynchronousTableStructure(DataSource dataSource){
|
||||
Connection connection=null;
|
||||
try {
|
||||
connection=getConnection(dataSource);
|
||||
|
@ -177,11 +196,11 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
recordsTotal = resultSet.getInt("recordsTotal");
|
||||
}
|
||||
resultSet.close();
|
||||
TableInfo tableInfo = TableInfo.builder().name(tableName).comment(comment).recordsTotal(recordsTotal).dataSourceId(dataSourceId).build();
|
||||
TableInfo tableInfo = TableInfo.builder().name(tableName).comment(comment).recordsTotal(recordsTotal).dataSourceId(dataSource.getId()).build();
|
||||
//可能同步过,先删除
|
||||
TableInfo one = tableInfoService.getOne(new LambdaQueryWrapper<TableInfo>()
|
||||
.eq(TableInfo::getName, tableName)
|
||||
.eq(TableInfo::getDataSourceId, dataSourceId));
|
||||
.eq(TableInfo::getDataSourceId, dataSource.getId()));
|
||||
if(Objects.nonNull(one)){
|
||||
tableInfoService.removeById(one.getId());
|
||||
columnInfoService.remove(new LambdaQueryWrapper<ColumnInfo>()
|
||||
|
@ -191,7 +210,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
tableInfoService.save(tableInfo);
|
||||
//异步存储表数据
|
||||
CompletableFuture.runAsync(()->{
|
||||
this.saveTableInfo(dataSource,tableName,tableInfo.getId());
|
||||
this.SynchronousColumnInfo(dataSource,tableName,tableInfo.getId());
|
||||
});
|
||||
}
|
||||
tables.close();
|
||||
|
@ -201,8 +220,16 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
}
|
||||
}
|
||||
|
||||
public void saveTableInfo(DataSource dataSource,String tableName,Long tableId) {
|
||||
/**
|
||||
* 同步字段信息
|
||||
* @param dataSource 数据源
|
||||
* @param tableName 表名
|
||||
* @param tableId 表编号
|
||||
*/
|
||||
public void SynchronousColumnInfo(DataSource dataSource,String tableName,Long tableId) {
|
||||
//声明字段信息空集合
|
||||
List<ColumnInfo> list=new ArrayList<>();
|
||||
//声明资产模型数据空集合
|
||||
List<AssetModelData> moduleDataList=new ArrayList<>();
|
||||
Connection connection=null;
|
||||
try {
|
||||
|
@ -214,24 +241,26 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
primaryKeyName=primaryKeys.getString("COLUMN_NAME");
|
||||
}
|
||||
primaryKeys.close();
|
||||
//不是mysql,sqlserver获取注解
|
||||
//sqlserver获取字段注解
|
||||
ResultSet sqlServerSet=null;
|
||||
if (!connection.getMetaData().getDriverName().toUpperCase().contains("MYSQL")) {
|
||||
if (dataSource.getDatabaseType().contains("sqlserver")) {
|
||||
String sql="SELECT C.value AS column_description FROM sys.tables A INNER JOIN sys.columns B ON B.object_id = A.object_id\n" +
|
||||
"LEFT JOIN sys.extended_properties C ON C.major_id = B.object_id AND C.minor_id = B.column_id WHERE A.name = '"+tableName+"'";
|
||||
sqlServerSet = connection.prepareStatement(sql).executeQuery();
|
||||
}
|
||||
int index=1;
|
||||
//获取表的第一条数据
|
||||
String selectFirst="select * from " + tableName ;
|
||||
if (connection.getMetaData().getDriverName().toUpperCase().contains("MYSQL")) {
|
||||
if (dataSource.getDatabaseType().contains("mysql")) {
|
||||
selectFirst+=" limit 1;";
|
||||
}else {
|
||||
}else if (dataSource.getDatabaseType().contains("sqlserver")){
|
||||
selectFirst+=" order by "+primaryKeyName+" offset 0 rows fetch next 1 rows only;";
|
||||
}
|
||||
PreparedStatement preparedStatement = connection.prepareStatement(selectFirst);
|
||||
ResultSetMetaData data = preparedStatement.getMetaData();
|
||||
ResultSet resultSet = preparedStatement.executeQuery();
|
||||
boolean next = resultSet.next();
|
||||
//循环获取字段信息
|
||||
while(columns.next()) {
|
||||
//字段名
|
||||
String columnName = columns.getString("COLUMN_NAME");
|
||||
|
@ -264,22 +293,28 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
|
|||
list.add(ColumnInfo.builder().name(columnName).type(datatype).scale(scale)
|
||||
.javaType(javaType).isKey(isKey).comment(columnComment).defaultValue(defaultValue)
|
||||
.length(columnSize).isNullable(isNullable).tableId(tableId).isDict("N").dataDictType(null).build());
|
||||
//获取资产模型数据
|
||||
if(next){
|
||||
String string = resultSet.getString(index++);
|
||||
moduleDataList.add(AssetModelData.builder().key(dataSource.getDataSourceName()+"."+tableName+"."+columnName)
|
||||
.type(javaType).value(string==null?"null":string).tableId(tableId).build());
|
||||
}
|
||||
}
|
||||
//批量添加字段信息
|
||||
columnInfoService.saveBatch(list);
|
||||
//查询是否已有资产模型数据
|
||||
List<Long> longList = assetModuleDataService.list(new LambdaQueryWrapper<AssetModelData>().eq(AssetModelData::getTableId, tableId))
|
||||
.stream().map(AssetModelData::getId).toList();
|
||||
if(!longList.isEmpty()){
|
||||
//批量删除
|
||||
assetModuleDataService.removeBatchByIds(longList);
|
||||
}
|
||||
//批量添加
|
||||
assetModuleDataService.saveBatch(moduleDataList);
|
||||
if(sqlServerSet!=null){
|
||||
sqlServerSet.close();
|
||||
}
|
||||
//关闭连接
|
||||
columns.close();
|
||||
resultSet.close();
|
||||
connection.close();
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package com.ruoyi.dataAsset.service.impl;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.ruoyi.common.core.utils.ObjUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
|
Loading…
Reference in New Issue