diff --git a/srt-cloud-data-integrate/pom.xml b/srt-cloud-data-integrate/pom.xml index c39e258..78eb844 100644 --- a/srt-cloud-data-integrate/pom.xml +++ b/srt-cloud-data-integrate/pom.xml @@ -11,6 +11,7 @@ + org.mongodb mongodb-driver-sync diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java index 164ac72..9d53c44 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java @@ -1,14 +1,48 @@ package net.srt.service.impl; import net.srt.entity.DataDatabaseEntity; -import org.springframework.stereotype.Component; +import net.srt.vo.TableVo; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; import org.springframework.stereotype.Service; import srt.cloud.framework.dbswitch.core.model.TableDescription; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; @Service public class EsService { public List getEsTables(DataDatabaseEntity dataDatabaseEntity) { + // 从数据实体中获取 Elasticsearch 服务器的地址和端口 + String esServerAddress = dataDatabaseEntity.getDatabaseIp(); + int esServerPort = Integer.parseInt(dataDatabaseEntity.getDatabasePort()); + + // 创建 RestHighLevelClient 实例,用于与 Elasticsearch 进行高级 REST 客户端通信 + RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder( + new HttpHost(esServerAddress, esServerPort, "http"))); + try { + // 创建获取索引请求,使用通配符 "*" 表示获取所有索引 + GetIndexRequest getIndexRequest = new GetIndexRequest("*"); + // 发送获取索引请求,获取所有索引的名称 + String[] indexes = client.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices(); + // 创建一个列表用于存储索引信息 + List tableDescriptions = new ArrayList<>(); + // 遍历所有索引名称,创建 TableDescription 对象并添加到列表中 + for (String index : indexes) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(index); + tableDescriptions.add(tableDescription); + } + // 返回 Elasticsearch 索引信息列表 + return tableDescriptions; + } catch (IOException e) { + // 处理 IO 异常 + e.printStackTrace(); + } return null; } } diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java index 4320727..1269add 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java @@ -1,13 +1,40 @@ package net.srt.service.impl; import net.srt.entity.DataDatabaseEntity; +import net.srt.vo.TableVo; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; import org.springframework.stereotype.Service; import srt.cloud.framework.dbswitch.core.model.TableDescription; +import java.io.IOException; +import java.net.SocketException; +import java.util.ArrayList; import java.util.List; @Service public class FtpService { public List getFtpFiles(DataDatabaseEntity dataDatabaseEntity) { - return null; + List tableDescriptions = new ArrayList<>(); + FTPClient ftpClient = new FTPClient(); + + try { + ftpClient.connect(dataDatabaseEntity.getDatabaseIp(),Integer.valueOf(dataDatabaseEntity.getDatabasePort())); + ftpClient.login(dataDatabaseEntity.getUserName(),dataDatabaseEntity.getPassword()); + ftpClient.enterLocalPassiveMode(); + ftpClient.setFileType(FTP.BINARY_FILE_TYPE); + + // 列出FTP服务器上的文件 + FTPFile[] ftpFiles = ftpClient.listFiles(); + for (FTPFile ftpFile : ftpFiles) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(ftpFile.getName()); + tableDescriptions.add(tableDescription); + } + return tableDescriptions; + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java index 3bceb7d..2012b92 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java @@ -16,7 +16,7 @@ import java.util.stream.Collectors; @Service public class KafkaService { public List getKafkaTables(DataDatabaseEntity dataDatabaseEntity) { - String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + "." + dataDatabaseEntity.getDatabasePort(); + String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + ":" + dataDatabaseEntity.getDatabasePort(); Properties properties = new Properties(); properties.put("bootstrap.servers",bootstrapServers); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java index da8466b..1cff935 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java @@ -1,13 +1,45 @@ package net.srt.service.impl; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; import net.srt.entity.DataDatabaseEntity; import org.springframework.stereotype.Service; import srt.cloud.framework.dbswitch.core.model.TableDescription; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; @Service public class MongoService { public List getMongoTables(DataDatabaseEntity dataDatabaseEntity) { + List tableDescriptions = new ArrayList<>(); + try { + MongoCredential credential = MongoCredential.createCredential(dataDatabaseEntity.getUserName(),"admin",dataDatabaseEntity.getPassword().toCharArray()); + ServerAddress serverAddress = new ServerAddress(dataDatabaseEntity.getDatabaseIp(), Integer.valueOf(dataDatabaseEntity.getDatabasePort())); + MongoClientSettings settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings(builder -> + builder.hosts(Arrays.asList(serverAddress))) + .build(); + + MongoClient mongoClient = MongoClients.create(settings); + MongoDatabase database = mongoClient.getDatabase(dataDatabaseEntity.getDatabaseName()); + ArrayList into = database.listCollectionNames().into(new ArrayList<>()); + for (String s : into) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(s); + tableDescriptions.add(tableDescription); + } + return tableDescriptions; + + } catch (Exception e) { + e.printStackTrace(); + } return null; } } diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java index 5649c61..4e05cb9 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java @@ -2,12 +2,38 @@ package net.srt.service.impl; import net.srt.entity.DataDatabaseEntity; import org.springframework.stereotype.Service; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisConnectionException; import srt.cloud.framework.dbswitch.core.model.TableDescription; +import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + @Service public class RedisService { public List getRedisTables(DataDatabaseEntity dataDatabaseEntity) { - return null; + try (Jedis jedis = new Jedis(dataDatabaseEntity.getDatabaseIp(), Integer.parseInt(dataDatabaseEntity.getDatabasePort()))) { + // 获取所有 key + Set keys = jedis.keys("*"); + ArrayList tableDescriptions = new ArrayList<>(); + for (String key : keys) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(key); + tableDescriptions.add(tableDescription); + } + return tableDescriptions; + } catch (JedisConnectionException e) { + // 处理连接异常 + throw new RuntimeException("Redis连接异常: " + e.getMessage(), e); + } catch (NumberFormatException e) { + // 处理端口转换异常 + throw new RuntimeException("端口转换异常: " + e.getMessage(), e); + } catch (Exception e) { + // 处理其他异常 + throw new RuntimeException("发生异常: " + e.getMessage(), e); + } + } }