数据源收尾

pull/2/head
chenbingxuan 2023-12-13 08:34:33 +08:00
parent 47d5b51200
commit 6fc5a34d58
6 changed files with 124 additions and 4 deletions

View File

@ -11,6 +11,7 @@
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>

View File

@ -1,14 +1,48 @@
package net.srt.service.impl;
import net.srt.entity.DataDatabaseEntity;
import org.springframework.stereotype.Component;
import net.srt.vo.TableVo;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.springframework.stereotype.Service;
import srt.cloud.framework.dbswitch.core.model.TableDescription;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Service
public class EsService {
public List<TableDescription> getEsTables(DataDatabaseEntity dataDatabaseEntity) {
// 从数据实体中获取 Elasticsearch 服务器的地址和端口
String esServerAddress = dataDatabaseEntity.getDatabaseIp();
int esServerPort = Integer.parseInt(dataDatabaseEntity.getDatabasePort());
// 创建 RestHighLevelClient 实例,用于与 Elasticsearch 进行高级 REST 客户端通信
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(esServerAddress, esServerPort, "http")));
try {
// 创建获取索引请求,使用通配符 "*" 表示获取所有索引
GetIndexRequest getIndexRequest = new GetIndexRequest("*");
// 发送获取索引请求,获取所有索引的名称
String[] indexes = client.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
// 创建一个列表用于存储索引信息
List<TableDescription> tableDescriptions = new ArrayList<>();
// 遍历所有索引名称,创建 TableDescription 对象并添加到列表中
for (String index : indexes) {
TableDescription tableDescription = new TableDescription();
tableDescription.setTableName(index);
tableDescriptions.add(tableDescription);
}
// 返回 Elasticsearch 索引信息列表
return tableDescriptions;
} catch (IOException e) {
// 处理 IO 异常
e.printStackTrace();
}
return null;
}
}

View File

@ -1,13 +1,40 @@
package net.srt.service.impl;
import net.srt.entity.DataDatabaseEntity;
import net.srt.vo.TableVo;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.stereotype.Service;
import srt.cloud.framework.dbswitch.core.model.TableDescription;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
@Service
public class FtpService {
public List<TableDescription> getFtpFiles(DataDatabaseEntity dataDatabaseEntity) {
return null;
List<TableDescription> tableDescriptions = new ArrayList<>();
FTPClient ftpClient = new FTPClient();
try {
ftpClient.connect(dataDatabaseEntity.getDatabaseIp(),Integer.valueOf(dataDatabaseEntity.getDatabasePort()));
ftpClient.login(dataDatabaseEntity.getUserName(),dataDatabaseEntity.getPassword());
ftpClient.enterLocalPassiveMode();
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
// 列出FTP服务器上的文件
FTPFile[] ftpFiles = ftpClient.listFiles();
for (FTPFile ftpFile : ftpFiles) {
TableDescription tableDescription = new TableDescription();
tableDescription.setTableName(ftpFile.getName());
tableDescriptions.add(tableDescription);
}
return tableDescriptions;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -16,7 +16,7 @@ import java.util.stream.Collectors;
@Service
public class KafkaService {
public List<TableDescription> getKafkaTables(DataDatabaseEntity dataDatabaseEntity) {
String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + "." + dataDatabaseEntity.getDatabasePort();
String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + ":" + dataDatabaseEntity.getDatabasePort();
Properties properties = new Properties();
properties.put("bootstrap.servers",bootstrapServers);
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

View File

@ -1,13 +1,45 @@
package net.srt.service.impl;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import net.srt.entity.DataDatabaseEntity;
import org.springframework.stereotype.Service;
import srt.cloud.framework.dbswitch.core.model.TableDescription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Service
public class MongoService {
public List<TableDescription> getMongoTables(DataDatabaseEntity dataDatabaseEntity) {
List<TableDescription> tableDescriptions = new ArrayList<>();
try {
MongoCredential credential = MongoCredential.createCredential(dataDatabaseEntity.getUserName(),"admin",dataDatabaseEntity.getPassword().toCharArray());
ServerAddress serverAddress = new ServerAddress(dataDatabaseEntity.getDatabaseIp(), Integer.valueOf(dataDatabaseEntity.getDatabasePort()));
MongoClientSettings settings = MongoClientSettings.builder()
.credential(credential)
.applyToClusterSettings(builder ->
builder.hosts(Arrays.asList(serverAddress)))
.build();
MongoClient mongoClient = MongoClients.create(settings);
MongoDatabase database = mongoClient.getDatabase(dataDatabaseEntity.getDatabaseName());
ArrayList<String> into = database.listCollectionNames().into(new ArrayList<>());
for (String s : into) {
TableDescription tableDescription = new TableDescription();
tableDescription.setTableName(s);
tableDescriptions.add(tableDescription);
}
return tableDescriptions;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -2,12 +2,38 @@ package net.srt.service.impl;
import net.srt.entity.DataDatabaseEntity;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import srt.cloud.framework.dbswitch.core.model.TableDescription;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Service
public class RedisService {
public List<TableDescription> getRedisTables(DataDatabaseEntity dataDatabaseEntity) {
return null;
try (Jedis jedis = new Jedis(dataDatabaseEntity.getDatabaseIp(), Integer.parseInt(dataDatabaseEntity.getDatabasePort()))) {
// 获取所有 key
Set<String> keys = jedis.keys("*");
ArrayList<TableDescription> tableDescriptions = new ArrayList<>();
for (String key : keys) {
TableDescription tableDescription = new TableDescription();
tableDescription.setTableName(key);
tableDescriptions.add(tableDescription);
}
return tableDescriptions;
} catch (JedisConnectionException e) {
// 处理连接异常
throw new RuntimeException("Redis连接异常: " + e.getMessage(), e);
} catch (NumberFormatException e) {
// 处理端口转换异常
throw new RuntimeException("端口转换异常: " + e.getMessage(), e);
} catch (Exception e) {
// 处理其他异常
throw new RuntimeException("发生异常: " + e.getMessage(), e);
}
}
}