From 47d5b51200126cc6d56397a2ee97860b498e857a Mon Sep 17 00:00:00 2001 From: chenbingxuan <3581044601@qq.com> Date: Tue, 12 Dec 2023 20:44:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../net/srt/entity/DataDatabaseEntity.java | 14 +++++ .../service/impl/DataDatabaseServiceImpl.java | 60 +++++++++++++++---- .../java/net/srt/service/impl/EsService.java | 14 +++++ .../java/net/srt/service/impl/FtpService.java | 13 ++++ .../net/srt/service/impl/KafkaService.java | 41 +++++++++++++ .../net/srt/service/impl/MongoService.java | 13 ++++ .../net/srt/service/impl/RedisService.java | 13 ++++ .../src/main/java/net/srt/vo/TableVo.java | 8 +++ 8 files changed, 163 insertions(+), 13 deletions(-) create mode 100644 srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java create mode 100644 srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java create mode 100644 srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java create mode 100644 srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java create mode 100644 srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java b/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java index 543c949..8d975dd 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java @@ -1,5 +1,6 @@ package net.srt.entity; +import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; import com.baomidou.mybatisplus.annotation.*; @@ -78,5 +79,18 @@ public class DataDatabaseEntity extends BaseEntity { */ private Long projectId; + @Schema(description = "是否是关系型数据库") + private String isJdbc; + + @Schema(description = "es索引") + private String databaseIndex; + + @Schema(description = "es文档") + private String databaseDocument; + + + @Schema(description = "kafka主题") + private String databaseKafKa; + } diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java index 9a4456d..73db65b 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java @@ -54,6 +54,7 @@ import srt.cloud.framework.dbswitch.core.model.TableDescription; import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService; import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl; +import javax.annotation.Resource; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -69,11 +70,21 @@ import java.util.stream.Collectors; * @since 1.0.0 2022-10-09 */ @Service -@AllArgsConstructor public class DataDatabaseServiceImpl extends BaseServiceImpl implements DataDatabaseService { - - private final DataAccessDao dataAccessDao; - private final DataAccessService dataAccessService; + @Resource + private DataAccessDao dataAccessDao; + @Resource + private DataAccessService dataAccessService; + @Resource + private EsService esService; + @Resource + private MongoService mongoService; + @Resource + private KafkaService kafkaService; + @Resource + private RedisService redisService; + @Resource + private FtpService ftpService; @Override public PageResult page(DataDatabaseQuery query) { @@ -98,6 +109,10 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl getTables(DataDatabaseEntity dataDatabaseEntity) { ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(dataDatabaseEntity.getDatabaseType()); - IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum); - List tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl() - .replace("{host}", dataDatabaseEntity.getDatabaseIp()) - .replace("{port}", dataDatabaseEntity.getDatabasePort()) - .replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(), - ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName()); - return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); - } + List tableDescriptions = new ArrayList<>(); + try { + switch (productTypeEnum) { + case FTP: + tableDescriptions = ftpService.getFtpFiles(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case ES: + tableDescriptions = esService.getEsTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case MONGODB: + tableDescriptions = mongoService.getMongoTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case REDIS: + tableDescriptions = redisService.getRedisTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case KAFKA: + tableDescriptions = kafkaService.getKafkaTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); +// default: +// IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum); +// tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl().replace("{host}", dataDatabaseEntity.getDatabaseIp()).replace("{port}", dataDatabaseEntity.getDatabasePort()).replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } @SneakyThrows @Override diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java new file mode 100644 index 0000000..164ac72 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java @@ -0,0 +1,14 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.List; +@Service +public class EsService { + public List getEsTables(DataDatabaseEntity dataDatabaseEntity) { + return null; + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java new file mode 100644 index 0000000..4320727 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java @@ -0,0 +1,13 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.List; +@Service +public class FtpService { + public List getFtpFiles(DataDatabaseEntity dataDatabaseEntity) { + return null; + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java new file mode 100644 index 0000000..3bceb7d --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java @@ -0,0 +1,41 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import net.srt.vo.TableVo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.common.KafkaFuture; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +@Service +public class KafkaService { + public List getKafkaTables(DataDatabaseEntity dataDatabaseEntity) { + String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + "." + dataDatabaseEntity.getDatabasePort(); + Properties properties = new Properties(); + properties.put("bootstrap.servers",bootstrapServers); + properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); + AdminClient adminClient=null; + try { + adminClient = KafkaAdminClient.create(properties); + ListTopicsResult listTopicsResult = adminClient.listTopics(); + KafkaFuture> names = listTopicsResult.names(); + Set strings = names.get(); + List tableDescriptions=strings.stream().map(s -> { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(s); + return tableDescription; + }).collect(Collectors.toList()); + return tableDescriptions; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java new file mode 100644 index 0000000..da8466b --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java @@ -0,0 +1,13 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.List; +@Service +public class MongoService { + public List getMongoTables(DataDatabaseEntity dataDatabaseEntity) { + return null; + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java new file mode 100644 index 0000000..5649c61 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java @@ -0,0 +1,13 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.List; +@Service +public class RedisService { + public List getRedisTables(DataDatabaseEntity dataDatabaseEntity) { + return null; + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java b/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java index 4854331..8e72285 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java @@ -13,4 +13,12 @@ public class TableVo { private String tableName; private String remarks; private DBTableType tableType; + + public static TableVo createKafkaTopicVo(String topicName) { + TableVo kafkaTopicVo = new TableVo(); + kafkaTopicVo.setTableName(topicName); + kafkaTopicVo.setRemarks("Kafka Topic"); + kafkaTopicVo.setTableType(null); + return kafkaTopicVo; + } }