dev #2
|
@ -10,6 +10,46 @@
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mongodb</groupId>
|
||||||
|
<artifactId>mongodb-driver-sync</artifactId>
|
||||||
|
<version>4.4.0</version> <!-- 使用最新版本 -->
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-net</groupId>
|
||||||
|
<artifactId>commons-net</artifactId>
|
||||||
|
<version>3.8.0</version> <!-- 请根据实际情况使用最新版本 -->
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Elasticsearch Low Level REST Client -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.elasticsearch.client</groupId>
|
||||||
|
<artifactId>elasticsearch-rest-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Elasticsearch High Level REST Client -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.elasticsearch.client</groupId>
|
||||||
|
<artifactId>elasticsearch-rest-high-level-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
<version>2.8.0</version> <!-- 检查是否有更新的版本 -->
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>redis.clients</groupId>
|
||||||
|
<artifactId>jedis</artifactId>
|
||||||
|
<version>2.9.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>net.srt</groupId>
|
<groupId>net.srt</groupId>
|
||||||
<artifactId>srt-cloud-api</artifactId>
|
<artifactId>srt-cloud-api</artifactId>
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package net.srt.entity;
|
package net.srt.entity;
|
||||||
|
|
||||||
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
import com.baomidou.mybatisplus.annotation.*;
|
import com.baomidou.mybatisplus.annotation.*;
|
||||||
|
@ -78,5 +79,18 @@ public class DataDatabaseEntity extends BaseEntity {
|
||||||
*/
|
*/
|
||||||
private Long projectId;
|
private Long projectId;
|
||||||
|
|
||||||
|
@Schema(description = "是否是关系型数据库")
|
||||||
|
private String isJdbc;
|
||||||
|
|
||||||
|
@Schema(description = "es索引")
|
||||||
|
private String databaseIndex;
|
||||||
|
|
||||||
|
@Schema(description = "es文档")
|
||||||
|
private String databaseDocument;
|
||||||
|
|
||||||
|
|
||||||
|
@Schema(description = "kafka主题")
|
||||||
|
private String databaseKafKa;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
package net.srt.enums;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum IsJdbc {
|
||||||
|
YES("是"),
|
||||||
|
NO("否");
|
||||||
|
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package net.srt.enums;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum Judgment {
|
||||||
|
REDIS(17),KAFKA(18),ES(19),FTP(20),MONGODB(21);
|
||||||
|
|
||||||
|
private Integer index;
|
||||||
|
|
||||||
|
}
|
|
@ -4,6 +4,9 @@ import cn.hutool.core.util.StrUtil;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
|
import com.mongodb.client.MongoClient;
|
||||||
|
import com.mongodb.client.MongoClients;
|
||||||
|
import com.mongodb.client.MongoDatabase;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
|
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
|
||||||
|
@ -18,6 +21,8 @@ import net.srt.dao.DataDatabaseDao;
|
||||||
import net.srt.dto.SqlConsole;
|
import net.srt.dto.SqlConsole;
|
||||||
import net.srt.entity.DataAccessEntity;
|
import net.srt.entity.DataAccessEntity;
|
||||||
import net.srt.entity.DataDatabaseEntity;
|
import net.srt.entity.DataDatabaseEntity;
|
||||||
|
import net.srt.enums.IsJdbc;
|
||||||
|
import net.srt.enums.Judgment;
|
||||||
import net.srt.framework.common.cache.bean.DataProjectCacheBean;
|
import net.srt.framework.common.cache.bean.DataProjectCacheBean;
|
||||||
import net.srt.framework.common.exception.ServerException;
|
import net.srt.framework.common.exception.ServerException;
|
||||||
import net.srt.framework.common.page.PageResult;
|
import net.srt.framework.common.page.PageResult;
|
||||||
|
@ -33,8 +38,14 @@ import net.srt.vo.DataDatabaseVO;
|
||||||
import net.srt.vo.SchemaTableDataVo;
|
import net.srt.vo.SchemaTableDataVo;
|
||||||
import net.srt.vo.SqlGenerationVo;
|
import net.srt.vo.SqlGenerationVo;
|
||||||
import net.srt.vo.TableVo;
|
import net.srt.vo.TableVo;
|
||||||
|
import org.apache.commons.net.ftp.FTP;
|
||||||
|
import org.apache.commons.net.ftp.FTPClient;
|
||||||
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
import redis.clients.jedis.Jedis;
|
||||||
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
|
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
|
||||||
import srt.cloud.framework.dbswitch.common.util.StringUtil;
|
import srt.cloud.framework.dbswitch.common.util.StringUtil;
|
||||||
import srt.cloud.framework.dbswitch.core.model.ColumnDescription;
|
import srt.cloud.framework.dbswitch.core.model.ColumnDescription;
|
||||||
|
@ -43,8 +54,13 @@ import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
||||||
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
|
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
|
||||||
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
|
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -54,11 +70,21 @@ import java.util.stream.Collectors;
|
||||||
* @since 1.0.0 2022-10-09
|
* @since 1.0.0 2022-10-09
|
||||||
*/
|
*/
|
||||||
@Service
|
@Service
|
||||||
@AllArgsConstructor
|
|
||||||
public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, DataDatabaseEntity> implements DataDatabaseService {
|
public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, DataDatabaseEntity> implements DataDatabaseService {
|
||||||
|
@Resource
|
||||||
private final DataAccessDao dataAccessDao;
|
private DataAccessDao dataAccessDao;
|
||||||
private final DataAccessService dataAccessService;
|
@Resource
|
||||||
|
private DataAccessService dataAccessService;
|
||||||
|
@Resource
|
||||||
|
private EsService esService;
|
||||||
|
@Resource
|
||||||
|
private MongoService mongoService;
|
||||||
|
@Resource
|
||||||
|
private KafkaService kafkaService;
|
||||||
|
@Resource
|
||||||
|
private RedisService redisService;
|
||||||
|
@Resource
|
||||||
|
private FtpService ftpService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageResult<DataDatabaseVO> page(DataDatabaseQuery query) {
|
public PageResult<DataDatabaseVO> page(DataDatabaseQuery query) {
|
||||||
|
@ -83,6 +109,10 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
public void save(DataDatabaseVO vo) {
|
public void save(DataDatabaseVO vo) {
|
||||||
DataDatabaseEntity entity = DataDatabaseConvert.INSTANCE.convert(vo);
|
DataDatabaseEntity entity = DataDatabaseConvert.INSTANCE.convert(vo);
|
||||||
entity.setProjectId(getProjectId());
|
entity.setProjectId(getProjectId());
|
||||||
|
entity.setIsJdbc(vo.getIsJdbc());
|
||||||
|
entity.setDatabaseIndex(vo.getDatabaseIndex());
|
||||||
|
entity.setDatabaseDocument(vo.getDatabaseDocument());
|
||||||
|
entity.setDatabaseKafKa(vo.getDatabaseKafKa());
|
||||||
setJdbcUrlByEntity(entity);
|
setJdbcUrlByEntity(entity);
|
||||||
baseMapper.insert(entity);
|
baseMapper.insert(entity);
|
||||||
try {
|
try {
|
||||||
|
@ -136,13 +166,15 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(vo.getDatabaseType());
|
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(vo.getDatabaseType());
|
||||||
|
|
||||||
IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
|
IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
|
||||||
if (StringUtil.isBlank(vo.getJdbcUrl())) {
|
if (StringUtil.isNotBlank(vo.getJdbcUrl())) {
|
||||||
vo.setJdbcUrl(productTypeEnum.getUrl()
|
if(productTypeEnum.getUrl()!=null){
|
||||||
.replace("{host}", vo.getDatabaseIp())
|
vo.setJdbcUrl(productTypeEnum.getUrl()
|
||||||
.replace("{port}", vo.getDatabasePort())
|
.replace("{host}", vo.getDatabaseIp())
|
||||||
.replace("{database}", vo.getDatabaseName()));
|
.replace("{port}", vo.getDatabasePort())
|
||||||
|
.replace("{database}", vo.getDatabaseName()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (vo.getIsJdbc()==1){
|
if (vo.getIsJdbc().equals(IsJdbc.YES.getValue())){
|
||||||
metaDataService.testQuerySQL(
|
metaDataService.testQuerySQL(
|
||||||
vo.getJdbcUrl(),
|
vo.getJdbcUrl(),
|
||||||
vo.getUserName(),
|
vo.getUserName(),
|
||||||
|
@ -150,7 +182,117 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
productTypeEnum.getTestSql()
|
productTypeEnum.getTestSql()
|
||||||
);
|
);
|
||||||
}else {
|
}else {
|
||||||
//TODO 连接
|
//判断是什么类型进行连接
|
||||||
|
if(vo.getDatabaseType().equals(Judgment.REDIS.getIndex())){
|
||||||
|
//连接本地的 Redis 服务
|
||||||
|
// 创建 Jedis 对象
|
||||||
|
Jedis jedis = new Jedis(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
|
||||||
|
try {
|
||||||
|
// 尝试连接到 Redis 服务器
|
||||||
|
jedis.connect();
|
||||||
|
|
||||||
|
// 检查是否连接成功
|
||||||
|
if (jedis.isConnected()) {
|
||||||
|
System.out.println("redis连接成功");
|
||||||
|
} else {
|
||||||
|
System.out.println("redis连接失败");
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 捕获连接异常
|
||||||
|
System.err.println("redis异常" + e.getMessage());
|
||||||
|
throw new ServerException("连接失败");
|
||||||
|
} finally {
|
||||||
|
// 关闭连接
|
||||||
|
jedis.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(vo.getDatabaseType().equals(Judgment.KAFKA.getIndex())){
|
||||||
|
// 配置生产者属性
|
||||||
|
Properties properties = new Properties();
|
||||||
|
String ip = vo.getDatabaseIp()+":"+vo.getDatabasePort(); // 设置一个无效的IP和端口,以模拟连接失败的情况
|
||||||
|
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
|
||||||
|
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||||
|
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||||
|
|
||||||
|
// 创建生产者方娇大傻蛋蠢猪大傻春
|
||||||
|
Producer<String, String> producer = null;
|
||||||
|
try {
|
||||||
|
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
|
||||||
|
|
||||||
|
// 消息
|
||||||
|
String message = "Default Message";
|
||||||
|
|
||||||
|
// 发送消息到主题
|
||||||
|
producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message));
|
||||||
|
System.out.println("消息发送成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("连接失败: " + e.getMessage());
|
||||||
|
throw new ServerException("连接失败");
|
||||||
|
} finally {
|
||||||
|
// 关闭生产者
|
||||||
|
if (producer != null) {
|
||||||
|
producer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(vo.getDatabaseType().equals(Judgment.MONGODB.getIndex())){
|
||||||
|
// 连接到 MongoDB 本地服务器
|
||||||
|
String url = productTypeEnum.getUrl()
|
||||||
|
.replace("{username}", vo.getUserName())
|
||||||
|
.replace("{password}", vo.getPassword())
|
||||||
|
.replace("{host}", vo.getDatabaseIp())
|
||||||
|
.replace("{port}", vo.getDatabasePort())
|
||||||
|
.replace("{database}", vo.getDatabaseName());
|
||||||
|
try (MongoClient mongoClient = MongoClients.create(url)) {
|
||||||
|
// 获取到数据库对象
|
||||||
|
MongoDatabase database = mongoClient.getDatabase(vo.getDatabaseName());
|
||||||
|
System.out.println("连接数据库成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("连接MongoDB报错: " + e.getMessage());
|
||||||
|
throw new ServerException("连接失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//
|
||||||
|
if(vo.getDatabaseType().equals(Judgment.FTP.getIndex())){
|
||||||
|
// String server = "ftp.example.com";
|
||||||
|
String user = vo.getUserName();
|
||||||
|
String password = vo.getPassword();
|
||||||
|
|
||||||
|
FTPClient ftpClient = new FTPClient();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 连接到 FTP 服务器
|
||||||
|
ftpClient.connect(vo.getDatabaseIp(),Integer.valueOf(vo.getDatabasePort()));
|
||||||
|
ftpClient.login(user, password);
|
||||||
|
ftpClient.enterLocalPassiveMode();
|
||||||
|
|
||||||
|
// 打印服务器返回的欢迎信息
|
||||||
|
System.out.println("Connected to " + vo.getDatabaseIp() + ".");
|
||||||
|
System.out.println(ftpClient.getReplyString());
|
||||||
|
|
||||||
|
// 设置文件传输类型为二进制
|
||||||
|
ftpClient.setFileType(FTP.ASCII_FILE_TYPE);
|
||||||
|
|
||||||
|
// 上传文件
|
||||||
|
uploadFile(ftpClient, vo.getDatabaseIp(), "D:/desktop/ftp.txt");
|
||||||
|
|
||||||
|
// 下载文件
|
||||||
|
// downloadFile(ftpClient, "C:/Users/栗永斌/Desktop/新建 文本文档.txt", "C:/Users/栗永斌/Desktop/新建 文本文档.txt");
|
||||||
|
|
||||||
|
// 登出并断开连接
|
||||||
|
ftpClient.logout();
|
||||||
|
ftpClient.disconnect();
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
if (vo.getId() != null) {
|
if (vo.getId() != null) {
|
||||||
//更新状态
|
//更新状态
|
||||||
|
@ -158,6 +300,39 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void uploadFile(FTPClient ftpClient, String localFilePath, String remoteFilePath) throws IOException {
|
||||||
|
FileInputStream inputStream = new FileInputStream(new File(remoteFilePath));
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 上传文件
|
||||||
|
boolean uploaded = ftpClient.storeFile(localFilePath, inputStream);
|
||||||
|
|
||||||
|
if (uploaded) {
|
||||||
|
System.out.println("File uploaded successfully.");
|
||||||
|
} else {
|
||||||
|
System.out.println("Failed to upload file.");
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
inputStream.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// private static void downloadFile(FTPClient ftpClient, String remoteFilePath, String localFilePath) throws IOException {
|
||||||
|
// FileOutputStream outputStream = new FileOutputStream(new File(localFilePath));
|
||||||
|
//
|
||||||
|
// // 下载文件
|
||||||
|
// boolean downloaded = ftpClient.retrieveFile(remoteFilePath, outputStream);
|
||||||
|
// outputStream.close();
|
||||||
|
//
|
||||||
|
// if (downloaded) {
|
||||||
|
// System.out.println("File downloaded successfully.");
|
||||||
|
// } else {
|
||||||
|
// System.out.println("Failed to download file.");
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<TableVo> getTablesById(Long id) {
|
public List<TableVo> getTablesById(Long id) {
|
||||||
DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id);
|
DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id);
|
||||||
|
@ -166,14 +341,33 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
|
|
||||||
private List<TableVo> getTables(DataDatabaseEntity dataDatabaseEntity) {
|
private List<TableVo> getTables(DataDatabaseEntity dataDatabaseEntity) {
|
||||||
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(dataDatabaseEntity.getDatabaseType());
|
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(dataDatabaseEntity.getDatabaseType());
|
||||||
IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
|
List<TableDescription> tableDescriptions = new ArrayList<>();
|
||||||
List<TableDescription> tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl()
|
try {
|
||||||
.replace("{host}", dataDatabaseEntity.getDatabaseIp())
|
switch (productTypeEnum) {
|
||||||
.replace("{port}", dataDatabaseEntity.getDatabasePort())
|
case FTP:
|
||||||
.replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(),
|
tableDescriptions = ftpService.getFtpFiles(dataDatabaseEntity);
|
||||||
ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName());
|
return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
|
||||||
return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
|
case ES:
|
||||||
}
|
tableDescriptions = esService.getEsTables(dataDatabaseEntity);
|
||||||
|
return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
|
||||||
|
case MONGODB:
|
||||||
|
tableDescriptions = mongoService.getMongoTables(dataDatabaseEntity);
|
||||||
|
return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
|
||||||
|
case REDIS:
|
||||||
|
tableDescriptions = redisService.getRedisTables(dataDatabaseEntity);
|
||||||
|
return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
|
||||||
|
case KAFKA:
|
||||||
|
tableDescriptions = kafkaService.getKafkaTables(dataDatabaseEntity);
|
||||||
|
return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
|
||||||
|
// default:
|
||||||
|
// IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
|
||||||
|
// tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl().replace("{host}", dataDatabaseEntity.getDatabaseIp()).replace("{port}", dataDatabaseEntity.getDatabasePort()).replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
package net.srt.service.impl;
|
||||||
|
|
||||||
|
import net.srt.entity.DataDatabaseEntity;
|
||||||
|
import net.srt.vo.TableVo;
|
||||||
|
import org.apache.http.HttpHost;
|
||||||
|
import org.elasticsearch.client.RequestOptions;
|
||||||
|
import org.elasticsearch.client.RestClient;
|
||||||
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
|
import org.elasticsearch.client.indices.GetIndexRequest;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
@Service
|
||||||
|
public class EsService {
|
||||||
|
public List<TableDescription> getEsTables(DataDatabaseEntity dataDatabaseEntity) {
|
||||||
|
// 从数据实体中获取 Elasticsearch 服务器的地址和端口
|
||||||
|
String esServerAddress = dataDatabaseEntity.getDatabaseIp();
|
||||||
|
int esServerPort = Integer.parseInt(dataDatabaseEntity.getDatabasePort());
|
||||||
|
|
||||||
|
// 创建 RestHighLevelClient 实例,用于与 Elasticsearch 进行高级 REST 客户端通信
|
||||||
|
RestHighLevelClient client = new RestHighLevelClient(
|
||||||
|
RestClient.builder(
|
||||||
|
new HttpHost(esServerAddress, esServerPort, "http")));
|
||||||
|
try {
|
||||||
|
// 创建获取索引请求,使用通配符 "*" 表示获取所有索引
|
||||||
|
GetIndexRequest getIndexRequest = new GetIndexRequest("*");
|
||||||
|
// 发送获取索引请求,获取所有索引的名称
|
||||||
|
String[] indexes = client.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
|
||||||
|
// 创建一个列表用于存储索引信息
|
||||||
|
List<TableDescription> tableDescriptions = new ArrayList<>();
|
||||||
|
// 遍历所有索引名称,创建 TableDescription 对象并添加到列表中
|
||||||
|
for (String index : indexes) {
|
||||||
|
TableDescription tableDescription = new TableDescription();
|
||||||
|
tableDescription.setTableName(index);
|
||||||
|
tableDescriptions.add(tableDescription);
|
||||||
|
}
|
||||||
|
// 返回 Elasticsearch 索引信息列表
|
||||||
|
return tableDescriptions;
|
||||||
|
} catch (IOException e) {
|
||||||
|
// 处理 IO 异常
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
package net.srt.service.impl;
|
||||||
|
|
||||||
|
import net.srt.entity.DataDatabaseEntity;
|
||||||
|
import net.srt.vo.TableVo;
|
||||||
|
import org.apache.commons.net.ftp.FTP;
|
||||||
|
import org.apache.commons.net.ftp.FTPClient;
|
||||||
|
import org.apache.commons.net.ftp.FTPFile;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
@Service
|
||||||
|
public class FtpService {
|
||||||
|
public List<TableDescription> getFtpFiles(DataDatabaseEntity dataDatabaseEntity) {
|
||||||
|
List<TableDescription> tableDescriptions = new ArrayList<>();
|
||||||
|
FTPClient ftpClient = new FTPClient();
|
||||||
|
|
||||||
|
try {
|
||||||
|
ftpClient.connect(dataDatabaseEntity.getDatabaseIp(),Integer.valueOf(dataDatabaseEntity.getDatabasePort()));
|
||||||
|
ftpClient.login(dataDatabaseEntity.getUserName(),dataDatabaseEntity.getPassword());
|
||||||
|
ftpClient.enterLocalPassiveMode();
|
||||||
|
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
|
||||||
|
|
||||||
|
// 列出FTP服务器上的文件
|
||||||
|
FTPFile[] ftpFiles = ftpClient.listFiles();
|
||||||
|
for (FTPFile ftpFile : ftpFiles) {
|
||||||
|
TableDescription tableDescription = new TableDescription();
|
||||||
|
tableDescription.setTableName(ftpFile.getName());
|
||||||
|
tableDescriptions.add(tableDescription);
|
||||||
|
}
|
||||||
|
return tableDescriptions;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package net.srt.service.impl;
|
||||||
|
|
||||||
|
import net.srt.entity.DataDatabaseEntity;
|
||||||
|
import net.srt.vo.TableVo;
|
||||||
|
import org.apache.kafka.clients.admin.AdminClient;
|
||||||
|
import org.apache.kafka.clients.admin.KafkaAdminClient;
|
||||||
|
import org.apache.kafka.clients.admin.ListTopicsResult;
|
||||||
|
import org.apache.kafka.common.KafkaFuture;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
@Service
|
||||||
|
public class KafkaService {
|
||||||
|
public List<TableDescription> getKafkaTables(DataDatabaseEntity dataDatabaseEntity) {
|
||||||
|
String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + ":" + dataDatabaseEntity.getDatabasePort();
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put("bootstrap.servers",bootstrapServers);
|
||||||
|
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
|
||||||
|
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
|
||||||
|
AdminClient adminClient=null;
|
||||||
|
try {
|
||||||
|
adminClient = KafkaAdminClient.create(properties);
|
||||||
|
ListTopicsResult listTopicsResult = adminClient.listTopics();
|
||||||
|
KafkaFuture<Set<String>> names = listTopicsResult.names();
|
||||||
|
Set<String> strings = names.get();
|
||||||
|
List<TableDescription> tableDescriptions=strings.stream().map(s -> {
|
||||||
|
TableDescription tableDescription = new TableDescription();
|
||||||
|
tableDescription.setTableName(s);
|
||||||
|
return tableDescription;
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
return tableDescriptions;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
package net.srt.service.impl;
|
||||||
|
|
||||||
|
import com.mongodb.MongoClientSettings;
|
||||||
|
import com.mongodb.MongoCredential;
|
||||||
|
import com.mongodb.ServerAddress;
|
||||||
|
import com.mongodb.client.MongoClient;
|
||||||
|
import com.mongodb.client.MongoClients;
|
||||||
|
import com.mongodb.client.MongoDatabase;
|
||||||
|
import com.mongodb.client.MongoIterable;
|
||||||
|
import net.srt.entity.DataDatabaseEntity;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
@Service
|
||||||
|
public class MongoService {
|
||||||
|
public List<TableDescription> getMongoTables(DataDatabaseEntity dataDatabaseEntity) {
|
||||||
|
List<TableDescription> tableDescriptions = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
MongoCredential credential = MongoCredential.createCredential(dataDatabaseEntity.getUserName(),"admin",dataDatabaseEntity.getPassword().toCharArray());
|
||||||
|
ServerAddress serverAddress = new ServerAddress(dataDatabaseEntity.getDatabaseIp(), Integer.valueOf(dataDatabaseEntity.getDatabasePort()));
|
||||||
|
MongoClientSettings settings = MongoClientSettings.builder()
|
||||||
|
.credential(credential)
|
||||||
|
.applyToClusterSettings(builder ->
|
||||||
|
builder.hosts(Arrays.asList(serverAddress)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
MongoClient mongoClient = MongoClients.create(settings);
|
||||||
|
MongoDatabase database = mongoClient.getDatabase(dataDatabaseEntity.getDatabaseName());
|
||||||
|
ArrayList<String> into = database.listCollectionNames().into(new ArrayList<>());
|
||||||
|
for (String s : into) {
|
||||||
|
TableDescription tableDescription = new TableDescription();
|
||||||
|
tableDescription.setTableName(s);
|
||||||
|
tableDescriptions.add(tableDescription);
|
||||||
|
}
|
||||||
|
return tableDescriptions;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package net.srt.service.impl;
|
||||||
|
|
||||||
|
import net.srt.entity.DataDatabaseEntity;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import redis.clients.jedis.Jedis;
|
||||||
|
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||||
|
import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class RedisService {
|
||||||
|
public List<TableDescription> getRedisTables(DataDatabaseEntity dataDatabaseEntity) {
|
||||||
|
try (Jedis jedis = new Jedis(dataDatabaseEntity.getDatabaseIp(), Integer.parseInt(dataDatabaseEntity.getDatabasePort()))) {
|
||||||
|
// 获取所有 key
|
||||||
|
Set<String> keys = jedis.keys("*");
|
||||||
|
ArrayList<TableDescription> tableDescriptions = new ArrayList<>();
|
||||||
|
for (String key : keys) {
|
||||||
|
TableDescription tableDescription = new TableDescription();
|
||||||
|
tableDescription.setTableName(key);
|
||||||
|
tableDescriptions.add(tableDescription);
|
||||||
|
}
|
||||||
|
return tableDescriptions;
|
||||||
|
} catch (JedisConnectionException e) {
|
||||||
|
// 处理连接异常
|
||||||
|
throw new RuntimeException("Redis连接异常: " + e.getMessage(), e);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
// 处理端口转换异常
|
||||||
|
throw new RuntimeException("端口转换异常: " + e.getMessage(), e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 处理其他异常
|
||||||
|
throw new RuntimeException("发生异常: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -79,7 +79,20 @@ public class DataDatabaseVO implements Serializable {
|
||||||
private Date updateTime;
|
private Date updateTime;
|
||||||
|
|
||||||
@Schema(description = "是否是关系型数据库")
|
@Schema(description = "是否是关系型数据库")
|
||||||
private Integer isJdbc;
|
private String isJdbc;
|
||||||
|
|
||||||
|
@Schema(description = "es索引")
|
||||||
|
private String databaseIndex;
|
||||||
|
|
||||||
|
@Schema(description = "es文档")
|
||||||
|
private String databaseDocument;
|
||||||
|
|
||||||
|
|
||||||
|
@Schema(description = "kafka主题")
|
||||||
|
private String databaseKafKa;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,4 +13,12 @@ public class TableVo {
|
||||||
private String tableName;
|
private String tableName;
|
||||||
private String remarks;
|
private String remarks;
|
||||||
private DBTableType tableType;
|
private DBTableType tableType;
|
||||||
|
|
||||||
|
public static TableVo createKafkaTopicVo(String topicName) {
|
||||||
|
TableVo kafkaTopicVo = new TableVo();
|
||||||
|
kafkaTopicVo.setTableName(topicName);
|
||||||
|
kafkaTopicVo.setRemarks("Kafka Topic");
|
||||||
|
kafkaTopicVo.setTableType(null);
|
||||||
|
return kafkaTopicVo;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,11 +101,12 @@ public enum ProductTypeEnum {
|
||||||
*/
|
*/
|
||||||
DORIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "jdbc:mysql://{host}:{port}/{database}?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true"),
|
DORIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "jdbc:mysql://{host}:{port}/{database}?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true"),
|
||||||
|
|
||||||
REDIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
REDIS(17, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||||
KAFKA(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
KAFKA(18, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||||
ES(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
ES(19, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||||
FTP(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
FTP(20, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "ftp://example.com"),
|
||||||
MONGODB(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
MONGODB(21, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "mongodb://{username}:{password}@{host}:{port}/{database}"),
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private Integer index;
|
private Integer index;
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseEsImpl extends DatabaseMysqlImpl implements IDatabaseInterf
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProductTypeEnum getDatabaseType() {
|
public ProductTypeEnum getDatabaseType() {
|
||||||
return ProductTypeEnum.DORIS;
|
return ProductTypeEnum.ES;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseFMongodbmpl extends DatabaseMysqlImpl implements IDatabaseI
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProductTypeEnum getDatabaseType() {
|
public ProductTypeEnum getDatabaseType() {
|
||||||
return ProductTypeEnum.DORIS;
|
return ProductTypeEnum.MONGODB;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseFTPImpl extends DatabaseMysqlImpl implements IDatabaseInter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProductTypeEnum getDatabaseType() {
|
public ProductTypeEnum getDatabaseType() {
|
||||||
return ProductTypeEnum.DORIS;
|
return ProductTypeEnum.FTP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseKafkaImpl extends DatabaseMysqlImpl implements IDatabaseInt
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProductTypeEnum getDatabaseType() {
|
public ProductTypeEnum getDatabaseType() {
|
||||||
return ProductTypeEnum.DORIS;
|
return ProductTypeEnum.KAFKA;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -50,45 +50,44 @@ public class DatabaseRedisImpl extends DatabaseMysqlImpl implements IDatabaseInt
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProductTypeEnum getDatabaseType() {
|
public ProductTypeEnum getDatabaseType() {
|
||||||
return ProductTypeEnum.DORIS;
|
return ProductTypeEnum.REDIS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName,
|
public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName,
|
||||||
String tableName) {
|
String tableName) {
|
||||||
// String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
|
String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
|
||||||
// List<ColumnDescription> columnDescriptions = this.querySelectSqlColumnMeta(connection, sql);
|
List<ColumnDescription> columnDescriptions = this.querySelectSqlColumnMeta(connection, sql);
|
||||||
// // 补充一下字段信息,获取的不准
|
// 补充一下字段信息,获取的不准
|
||||||
// String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'";
|
String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'";
|
||||||
// try (PreparedStatement ps = connection.prepareStatement(extraSql);
|
try (PreparedStatement ps = connection.prepareStatement(extraSql);
|
||||||
// ResultSet rs = ps.executeQuery();
|
ResultSet rs = ps.executeQuery();
|
||||||
// ) {
|
) {
|
||||||
// while (rs.next()) {
|
while (rs.next()) {
|
||||||
// String columnName = rs.getString("column_name");
|
String columnName = rs.getString("column_name");
|
||||||
// String dataType = rs.getString("data_type");
|
String dataType = rs.getString("data_type");
|
||||||
// String columnSize = rs.getString("column_size");
|
String columnSize = rs.getString("column_size");
|
||||||
// String decimalDigits = rs.getString("decimal_digits");
|
String decimalDigits = rs.getString("decimal_digits");
|
||||||
// String columnComment = rs.getString("column_comment");
|
String columnComment = rs.getString("column_comment");
|
||||||
// if (columnName != null) {
|
if (columnName != null) {
|
||||||
// for (ColumnDescription cd : columnDescriptions) {
|
for (ColumnDescription cd : columnDescriptions) {
|
||||||
// if (columnName.equals(cd.getFieldName())) {
|
if (columnName.equals(cd.getFieldName())) {
|
||||||
// cd.setFieldTypeName(dataType);
|
cd.setFieldTypeName(dataType);
|
||||||
// int csize = columnSize != null ? Integer.parseInt(columnSize) : 0;
|
int csize = columnSize != null ? Integer.parseInt(columnSize) : 0;
|
||||||
// cd.setDisplaySize(csize);
|
cd.setDisplaySize(csize);
|
||||||
// cd.setPrecisionSize(csize);
|
cd .setPrecisionSize(csize);
|
||||||
// cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0);
|
cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0);
|
||||||
// cd.setRemarks(columnComment);
|
cd.setRemarks(columnComment);
|
||||||
// break;
|
break;
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// } catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
// throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e);
|
throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e);
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// return columnDescriptions;
|
return columnDescriptions;
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue