diff --git a/srt-cloud-data-integrate/pom.xml b/srt-cloud-data-integrate/pom.xml
index e5b0656..78eb844 100644
--- a/srt-cloud-data-integrate/pom.xml
+++ b/srt-cloud-data-integrate/pom.xml
@@ -10,6 +10,46 @@
jar
+
+
+
+ org.mongodb
+ mongodb-driver-sync
+ 4.4.0
+
+
+
+ commons-net
+ commons-net
+ 3.8.0
+
+
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+
+
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-high-level-client
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.8.0
+
+
+
+ redis.clients
+ jedis
+ 2.9.0
+
+
net.srt
srt-cloud-api
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java b/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java
index 543c949..8d975dd 100644
--- a/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java
@@ -1,5 +1,6 @@
package net.srt.entity;
+import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import com.baomidou.mybatisplus.annotation.*;
@@ -78,5 +79,18 @@ public class DataDatabaseEntity extends BaseEntity {
*/
private Long projectId;
+ @Schema(description = "是否是关系型数据库")
+ private String isJdbc;
+
+ @Schema(description = "es索引")
+ private String databaseIndex;
+
+ @Schema(description = "es文档")
+ private String databaseDocument;
+
+
+ @Schema(description = "kafka主题")
+ private String databaseKafKa;
+
}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/enums/IsJdbc.java b/srt-cloud-data-integrate/src/main/java/net/srt/enums/IsJdbc.java
new file mode 100644
index 0000000..22718b5
--- /dev/null
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/enums/IsJdbc.java
@@ -0,0 +1,14 @@
+package net.srt.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public enum IsJdbc {
+ YES("是"),
+ NO("否");
+
+ private String value;
+
+}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/enums/Judgment.java b/srt-cloud-data-integrate/src/main/java/net/srt/enums/Judgment.java
new file mode 100644
index 0000000..6cc5d24
--- /dev/null
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/enums/Judgment.java
@@ -0,0 +1,13 @@
+package net.srt.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public enum Judgment {
+ REDIS(17),KAFKA(18),ES(19),FTP(20),MONGODB(21);
+
+ private Integer index;
+
+}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java
index fd4efad..73db65b 100644
--- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java
@@ -4,6 +4,9 @@ import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
@@ -18,6 +21,8 @@ import net.srt.dao.DataDatabaseDao;
import net.srt.dto.SqlConsole;
import net.srt.entity.DataAccessEntity;
import net.srt.entity.DataDatabaseEntity;
+import net.srt.enums.IsJdbc;
+import net.srt.enums.Judgment;
import net.srt.framework.common.cache.bean.DataProjectCacheBean;
import net.srt.framework.common.exception.ServerException;
import net.srt.framework.common.page.PageResult;
@@ -33,8 +38,14 @@ import net.srt.vo.DataDatabaseVO;
import net.srt.vo.SchemaTableDataVo;
import net.srt.vo.SqlGenerationVo;
import net.srt.vo.TableVo;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import redis.clients.jedis.Jedis;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import srt.cloud.framework.dbswitch.common.util.StringUtil;
import srt.cloud.framework.dbswitch.core.model.ColumnDescription;
@@ -43,8 +54,13 @@ import srt.cloud.framework.dbswitch.core.model.TableDescription;
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
+import javax.annotation.Resource;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.stream.Collectors;
/**
@@ -54,11 +70,21 @@ import java.util.stream.Collectors;
* @since 1.0.0 2022-10-09
*/
@Service
-@AllArgsConstructor
public class DataDatabaseServiceImpl extends BaseServiceImpl implements DataDatabaseService {
-
- private final DataAccessDao dataAccessDao;
- private final DataAccessService dataAccessService;
+ @Resource
+ private DataAccessDao dataAccessDao;
+ @Resource
+ private DataAccessService dataAccessService;
+ @Resource
+ private EsService esService;
+ @Resource
+ private MongoService mongoService;
+ @Resource
+ private KafkaService kafkaService;
+ @Resource
+ private RedisService redisService;
+ @Resource
+ private FtpService ftpService;
@Override
public PageResult page(DataDatabaseQuery query) {
@@ -83,6 +109,10 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl producer = null;
+ try {
+ producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
+
+ // 消息
+ String message = "Default Message";
+
+ // 发送消息到主题
+ producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message));
+ System.out.println("消息发送成功");
+ } catch (Exception e) {
+ System.err.println("连接失败: " + e.getMessage());
+ throw new ServerException("连接失败");
+ } finally {
+ // 关闭生产者
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ }
+
+ if(vo.getDatabaseType().equals(Judgment.MONGODB.getIndex())){
+ // 连接到 MongoDB 本地服务器
+ String url = productTypeEnum.getUrl()
+ .replace("{username}", vo.getUserName())
+ .replace("{password}", vo.getPassword())
+ .replace("{host}", vo.getDatabaseIp())
+ .replace("{port}", vo.getDatabasePort())
+ .replace("{database}", vo.getDatabaseName());
+ try (MongoClient mongoClient = MongoClients.create(url)) {
+ // 获取到数据库对象
+ MongoDatabase database = mongoClient.getDatabase(vo.getDatabaseName());
+ System.out.println("连接数据库成功");
+ } catch (Exception e) {
+ System.err.println("连接MongoDB报错: " + e.getMessage());
+ throw new ServerException("连接失败");
+ }
+ }
+ //
+ if(vo.getDatabaseType().equals(Judgment.FTP.getIndex())){
+ // String server = "ftp.example.com";
+ String user = vo.getUserName();
+ String password = vo.getPassword();
+
+ FTPClient ftpClient = new FTPClient();
+
+ try {
+ // 连接到 FTP 服务器
+ ftpClient.connect(vo.getDatabaseIp(),Integer.valueOf(vo.getDatabasePort()));
+ ftpClient.login(user, password);
+ ftpClient.enterLocalPassiveMode();
+
+ // 打印服务器返回的欢迎信息
+ System.out.println("Connected to " + vo.getDatabaseIp() + ".");
+ System.out.println(ftpClient.getReplyString());
+
+ // 设置文件传输类型为二进制
+ ftpClient.setFileType(FTP.ASCII_FILE_TYPE);
+
+ // 上传文件
+ uploadFile(ftpClient, vo.getDatabaseIp(), "D:/desktop/ftp.txt");
+
+ // 下载文件
+// downloadFile(ftpClient, "C:/Users/栗永斌/Desktop/新建 文本文档.txt", "C:/Users/栗永斌/Desktop/新建 文本文档.txt");
+
+ // 登出并断开连接
+ ftpClient.logout();
+ ftpClient.disconnect();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+//
+
+
+
}
if (vo.getId() != null) {
//更新状态
@@ -158,6 +300,39 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl getTablesById(Long id) {
DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id);
@@ -166,14 +341,33 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl getTables(DataDatabaseEntity dataDatabaseEntity) {
ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(dataDatabaseEntity.getDatabaseType());
- IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
- List tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl()
- .replace("{host}", dataDatabaseEntity.getDatabaseIp())
- .replace("{port}", dataDatabaseEntity.getDatabasePort())
- .replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(),
- ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName());
- return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
- }
+ List tableDescriptions = new ArrayList<>();
+ try {
+ switch (productTypeEnum) {
+ case FTP:
+ tableDescriptions = ftpService.getFtpFiles(dataDatabaseEntity);
+ return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
+ case ES:
+ tableDescriptions = esService.getEsTables(dataDatabaseEntity);
+ return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
+ case MONGODB:
+ tableDescriptions = mongoService.getMongoTables(dataDatabaseEntity);
+ return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
+ case REDIS:
+ tableDescriptions = redisService.getRedisTables(dataDatabaseEntity);
+ return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
+ case KAFKA:
+ tableDescriptions = kafkaService.getKafkaTables(dataDatabaseEntity);
+ return BeanUtil.copyListProperties(tableDescriptions, TableVo::new);
+// default:
+// IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
+// tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl().replace("{host}", dataDatabaseEntity.getDatabaseIp()).replace("{port}", dataDatabaseEntity.getDatabasePort()).replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
@SneakyThrows
@Override
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java
new file mode 100644
index 0000000..9d53c44
--- /dev/null
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java
@@ -0,0 +1,48 @@
+package net.srt.service.impl;
+
+import net.srt.entity.DataDatabaseEntity;
+import net.srt.vo.TableVo;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.springframework.stereotype.Service;
+import srt.cloud.framework.dbswitch.core.model.TableDescription;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+@Service
+public class EsService {
+ public List getEsTables(DataDatabaseEntity dataDatabaseEntity) {
+ // 从数据实体中获取 Elasticsearch 服务器的地址和端口
+ String esServerAddress = dataDatabaseEntity.getDatabaseIp();
+ int esServerPort = Integer.parseInt(dataDatabaseEntity.getDatabasePort());
+
+ // 创建 RestHighLevelClient 实例,用于与 Elasticsearch 进行高级 REST 客户端通信
+ RestHighLevelClient client = new RestHighLevelClient(
+ RestClient.builder(
+ new HttpHost(esServerAddress, esServerPort, "http")));
+ try {
+ // 创建获取索引请求,使用通配符 "*" 表示获取所有索引
+ GetIndexRequest getIndexRequest = new GetIndexRequest("*");
+ // 发送获取索引请求,获取所有索引的名称
+ String[] indexes = client.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
+ // 创建一个列表用于存储索引信息
+ List tableDescriptions = new ArrayList<>();
+ // 遍历所有索引名称,创建 TableDescription 对象并添加到列表中
+ for (String index : indexes) {
+ TableDescription tableDescription = new TableDescription();
+ tableDescription.setTableName(index);
+ tableDescriptions.add(tableDescription);
+ }
+ // 返回 Elasticsearch 索引信息列表
+ return tableDescriptions;
+ } catch (IOException e) {
+ // 处理 IO 异常
+ e.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java
new file mode 100644
index 0000000..1269add
--- /dev/null
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java
@@ -0,0 +1,40 @@
+package net.srt.service.impl;
+
+import net.srt.entity.DataDatabaseEntity;
+import net.srt.vo.TableVo;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.springframework.stereotype.Service;
+import srt.cloud.framework.dbswitch.core.model.TableDescription;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+@Service
+public class FtpService {
+ public List getFtpFiles(DataDatabaseEntity dataDatabaseEntity) {
+ List tableDescriptions = new ArrayList<>();
+ FTPClient ftpClient = new FTPClient();
+
+ try {
+ ftpClient.connect(dataDatabaseEntity.getDatabaseIp(),Integer.valueOf(dataDatabaseEntity.getDatabasePort()));
+ ftpClient.login(dataDatabaseEntity.getUserName(),dataDatabaseEntity.getPassword());
+ ftpClient.enterLocalPassiveMode();
+ ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
+
+ // 列出FTP服务器上的文件
+ FTPFile[] ftpFiles = ftpClient.listFiles();
+ for (FTPFile ftpFile : ftpFiles) {
+ TableDescription tableDescription = new TableDescription();
+ tableDescription.setTableName(ftpFile.getName());
+ tableDescriptions.add(tableDescription);
+ }
+ return tableDescriptions;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java
new file mode 100644
index 0000000..2012b92
--- /dev/null
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java
@@ -0,0 +1,41 @@
+package net.srt.service.impl;
+
+import net.srt.entity.DataDatabaseEntity;
+import net.srt.vo.TableVo;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.springframework.stereotype.Service;
+import srt.cloud.framework.dbswitch.core.model.TableDescription;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+@Service
+public class KafkaService {
+ public List getKafkaTables(DataDatabaseEntity dataDatabaseEntity) {
+ String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + ":" + dataDatabaseEntity.getDatabasePort();
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers",bootstrapServers);
+ properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
+ AdminClient adminClient=null;
+ try {
+ adminClient = KafkaAdminClient.create(properties);
+ ListTopicsResult listTopicsResult = adminClient.listTopics();
+ KafkaFuture> names = listTopicsResult.names();
+ Set strings = names.get();
+ List tableDescriptions=strings.stream().map(s -> {
+ TableDescription tableDescription = new TableDescription();
+ tableDescription.setTableName(s);
+ return tableDescription;
+ }).collect(Collectors.toList());
+ return tableDescriptions;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java
new file mode 100644
index 0000000..1cff935
--- /dev/null
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java
@@ -0,0 +1,45 @@
+package net.srt.service.impl;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import net.srt.entity.DataDatabaseEntity;
+import org.springframework.stereotype.Service;
+import srt.cloud.framework.dbswitch.core.model.TableDescription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+@Service
+public class MongoService {
+ public List getMongoTables(DataDatabaseEntity dataDatabaseEntity) {
+ List tableDescriptions = new ArrayList<>();
+ try {
+ MongoCredential credential = MongoCredential.createCredential(dataDatabaseEntity.getUserName(),"admin",dataDatabaseEntity.getPassword().toCharArray());
+ ServerAddress serverAddress = new ServerAddress(dataDatabaseEntity.getDatabaseIp(), Integer.valueOf(dataDatabaseEntity.getDatabasePort()));
+ MongoClientSettings settings = MongoClientSettings.builder()
+ .credential(credential)
+ .applyToClusterSettings(builder ->
+ builder.hosts(Arrays.asList(serverAddress)))
+ .build();
+
+ MongoClient mongoClient = MongoClients.create(settings);
+ MongoDatabase database = mongoClient.getDatabase(dataDatabaseEntity.getDatabaseName());
+ ArrayList into = database.listCollectionNames().into(new ArrayList<>());
+ for (String s : into) {
+ TableDescription tableDescription = new TableDescription();
+ tableDescription.setTableName(s);
+ tableDescriptions.add(tableDescription);
+ }
+ return tableDescriptions;
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java
new file mode 100644
index 0000000..4e05cb9
--- /dev/null
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java
@@ -0,0 +1,39 @@
+package net.srt.service.impl;
+
+import net.srt.entity.DataDatabaseEntity;
+import org.springframework.stereotype.Service;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import srt.cloud.framework.dbswitch.core.model.TableDescription;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Service
+public class RedisService {
+ public List getRedisTables(DataDatabaseEntity dataDatabaseEntity) {
+ try (Jedis jedis = new Jedis(dataDatabaseEntity.getDatabaseIp(), Integer.parseInt(dataDatabaseEntity.getDatabasePort()))) {
+ // 获取所有 key
+ Set keys = jedis.keys("*");
+ ArrayList tableDescriptions = new ArrayList<>();
+ for (String key : keys) {
+ TableDescription tableDescription = new TableDescription();
+ tableDescription.setTableName(key);
+ tableDescriptions.add(tableDescription);
+ }
+ return tableDescriptions;
+ } catch (JedisConnectionException e) {
+ // 处理连接异常
+ throw new RuntimeException("Redis连接异常: " + e.getMessage(), e);
+ } catch (NumberFormatException e) {
+ // 处理端口转换异常
+ throw new RuntimeException("端口转换异常: " + e.getMessage(), e);
+ } catch (Exception e) {
+ // 处理其他异常
+ throw new RuntimeException("发生异常: " + e.getMessage(), e);
+ }
+
+ }
+}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java b/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java
index 0d7dc3f..01b9e43 100644
--- a/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java
@@ -79,7 +79,20 @@ public class DataDatabaseVO implements Serializable {
private Date updateTime;
@Schema(description = "是否是关系型数据库")
- private Integer isJdbc;
+ private String isJdbc;
+
+ @Schema(description = "es索引")
+ private String databaseIndex;
+
+ @Schema(description = "es文档")
+ private String databaseDocument;
+
+
+ @Schema(description = "kafka主题")
+ private String databaseKafKa;
+
+
+
}
diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java b/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java
index 4854331..8e72285 100644
--- a/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java
+++ b/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java
@@ -13,4 +13,12 @@ public class TableVo {
private String tableName;
private String remarks;
private DBTableType tableType;
+
+ public static TableVo createKafkaTopicVo(String topicName) {
+ TableVo kafkaTopicVo = new TableVo();
+ kafkaTopicVo.setTableName(topicName);
+ kafkaTopicVo.setRemarks("Kafka Topic");
+ kafkaTopicVo.setTableType(null);
+ return kafkaTopicVo;
+ }
}
diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java
index e311415..4c305ba 100644
--- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java
+++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java
@@ -101,11 +101,12 @@ public enum ProductTypeEnum {
*/
DORIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "jdbc:mysql://{host}:{port}/{database}?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true"),
- REDIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
- KAFKA(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
- ES(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
- FTP(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
- MONGODB(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
+ REDIS(17, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
+ KAFKA(18, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
+ ES(19, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
+ FTP(20, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "ftp://example.com"),
+ MONGODB(21, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "mongodb://{username}:{password}@{host}:{port}/{database}"),
+
;
private Integer index;
diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java
index ce28261..ac4d042 100644
--- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java
+++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java
@@ -40,7 +40,7 @@ public class DatabaseEsImpl extends DatabaseMysqlImpl implements IDatabaseInterf
@Override
public ProductTypeEnum getDatabaseType() {
- return ProductTypeEnum.DORIS;
+ return ProductTypeEnum.ES;
}
@Override
diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java
index 616e59c..6450b28 100644
--- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java
+++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java
@@ -40,7 +40,7 @@ public class DatabaseFMongodbmpl extends DatabaseMysqlImpl implements IDatabaseI
@Override
public ProductTypeEnum getDatabaseType() {
- return ProductTypeEnum.DORIS;
+ return ProductTypeEnum.MONGODB;
}
@Override
diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java
index 7559a6c..ceefe09 100644
--- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java
+++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java
@@ -40,7 +40,7 @@ public class DatabaseFTPImpl extends DatabaseMysqlImpl implements IDatabaseInter
@Override
public ProductTypeEnum getDatabaseType() {
- return ProductTypeEnum.DORIS;
+ return ProductTypeEnum.FTP;
}
@Override
diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java
index b513d1b..9f15296 100644
--- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java
+++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java
@@ -40,7 +40,7 @@ public class DatabaseKafkaImpl extends DatabaseMysqlImpl implements IDatabaseInt
@Override
public ProductTypeEnum getDatabaseType() {
- return ProductTypeEnum.DORIS;
+ return ProductTypeEnum.KAFKA;
}
@Override
diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java
index 8d65bd8..43a736d 100644
--- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java
+++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java
@@ -50,45 +50,44 @@ public class DatabaseRedisImpl extends DatabaseMysqlImpl implements IDatabaseInt
@Override
public ProductTypeEnum getDatabaseType() {
- return ProductTypeEnum.DORIS;
+ return ProductTypeEnum.REDIS;
}
@Override
public List queryTableColumnMeta(Connection connection, String schemaName,
String tableName) {
-// String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
-// List columnDescriptions = this.querySelectSqlColumnMeta(connection, sql);
-// // 补充一下字段信息,获取的不准
-// String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'";
-// try (PreparedStatement ps = connection.prepareStatement(extraSql);
-// ResultSet rs = ps.executeQuery();
-// ) {
-// while (rs.next()) {
-// String columnName = rs.getString("column_name");
-// String dataType = rs.getString("data_type");
-// String columnSize = rs.getString("column_size");
-// String decimalDigits = rs.getString("decimal_digits");
-// String columnComment = rs.getString("column_comment");
-// if (columnName != null) {
-// for (ColumnDescription cd : columnDescriptions) {
-// if (columnName.equals(cd.getFieldName())) {
-// cd.setFieldTypeName(dataType);
-// int csize = columnSize != null ? Integer.parseInt(columnSize) : 0;
-// cd.setDisplaySize(csize);
-// cd.setPrecisionSize(csize);
-// cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0);
-// cd.setRemarks(columnComment);
-// break;
-// }
-// }
-// }
-// }
-// } catch (SQLException e) {
-// throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e);
-// }
-//
-// return columnDescriptions;
- return null;
+ String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
+ List columnDescriptions = this.querySelectSqlColumnMeta(connection, sql);
+ // 补充一下字段信息,获取的不准
+ String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'";
+ try (PreparedStatement ps = connection.prepareStatement(extraSql);
+ ResultSet rs = ps.executeQuery();
+ ) {
+ while (rs.next()) {
+ String columnName = rs.getString("column_name");
+ String dataType = rs.getString("data_type");
+ String columnSize = rs.getString("column_size");
+ String decimalDigits = rs.getString("decimal_digits");
+ String columnComment = rs.getString("column_comment");
+ if (columnName != null) {
+ for (ColumnDescription cd : columnDescriptions) {
+ if (columnName.equals(cd.getFieldName())) {
+ cd.setFieldTypeName(dataType);
+ int csize = columnSize != null ? Integer.parseInt(columnSize) : 0;
+ cd.setDisplaySize(csize);
+ cd .setPrecisionSize(csize);
+ cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0);
+ cd.setRemarks(columnComment);
+ break;
+ }
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e);
+ }
+
+ return columnDescriptions;
}
@Override
diff --git a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar
index d963086..e64813c 100644
Binary files a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar and b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar differ
diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar
index 9df30ff..70ec31f 100644
Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar differ
diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar
index f968ee4..a454886 100644
Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar differ