diff --git a/srt-cloud-data-integrate/pom.xml b/srt-cloud-data-integrate/pom.xml index e5b0656..78eb844 100644 --- a/srt-cloud-data-integrate/pom.xml +++ b/srt-cloud-data-integrate/pom.xml @@ -10,6 +10,46 @@ jar + + + + org.mongodb + mongodb-driver-sync + 4.4.0 + + + + commons-net + commons-net + 3.8.0 + + + + + org.elasticsearch.client + elasticsearch-rest-client + + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + + + + + + org.apache.kafka + kafka-clients + 2.8.0 + + + + redis.clients + jedis + 2.9.0 + + net.srt srt-cloud-api diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java b/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java index 543c949..8d975dd 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/entity/DataDatabaseEntity.java @@ -1,5 +1,6 @@ package net.srt.entity; +import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; import com.baomidou.mybatisplus.annotation.*; @@ -78,5 +79,18 @@ public class DataDatabaseEntity extends BaseEntity { */ private Long projectId; + @Schema(description = "是否是关系型数据库") + private String isJdbc; + + @Schema(description = "es索引") + private String databaseIndex; + + @Schema(description = "es文档") + private String databaseDocument; + + + @Schema(description = "kafka主题") + private String databaseKafKa; + } diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/enums/IsJdbc.java b/srt-cloud-data-integrate/src/main/java/net/srt/enums/IsJdbc.java new file mode 100644 index 0000000..22718b5 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/enums/IsJdbc.java @@ -0,0 +1,14 @@ +package net.srt.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum IsJdbc { + YES("是"), + NO("否"); + + private String value; + +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/enums/Judgment.java b/srt-cloud-data-integrate/src/main/java/net/srt/enums/Judgment.java new file mode 100644 index 0000000..6cc5d24 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/enums/Judgment.java @@ -0,0 +1,13 @@ +package net.srt.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum Judgment { + REDIS(17),KAFKA(18),ES(19),FTP(20),MONGODB(21); + + private Integer index; + +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java index fd4efad..73db65b 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java @@ -4,6 +4,9 @@ import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; import lombok.AllArgsConstructor; import lombok.SneakyThrows; import net.sf.jsqlparser.parser.CCJSqlParserUtil; @@ -18,6 +21,8 @@ import net.srt.dao.DataDatabaseDao; import net.srt.dto.SqlConsole; import net.srt.entity.DataAccessEntity; import net.srt.entity.DataDatabaseEntity; +import net.srt.enums.IsJdbc; +import net.srt.enums.Judgment; import net.srt.framework.common.cache.bean.DataProjectCacheBean; import net.srt.framework.common.exception.ServerException; import net.srt.framework.common.page.PageResult; @@ -33,8 +38,14 @@ import net.srt.vo.DataDatabaseVO; import net.srt.vo.SchemaTableDataVo; import net.srt.vo.SqlGenerationVo; import net.srt.vo.TableVo; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import redis.clients.jedis.Jedis; import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum; import srt.cloud.framework.dbswitch.common.util.StringUtil; import srt.cloud.framework.dbswitch.core.model.ColumnDescription; @@ -43,8 +54,13 @@ import srt.cloud.framework.dbswitch.core.model.TableDescription; import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService; import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl; +import javax.annotation.Resource; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; /** @@ -54,11 +70,21 @@ import java.util.stream.Collectors; * @since 1.0.0 2022-10-09 */ @Service -@AllArgsConstructor public class DataDatabaseServiceImpl extends BaseServiceImpl implements DataDatabaseService { - - private final DataAccessDao dataAccessDao; - private final DataAccessService dataAccessService; + @Resource + private DataAccessDao dataAccessDao; + @Resource + private DataAccessService dataAccessService; + @Resource + private EsService esService; + @Resource + private MongoService mongoService; + @Resource + private KafkaService kafkaService; + @Resource + private RedisService redisService; + @Resource + private FtpService ftpService; @Override public PageResult page(DataDatabaseQuery query) { @@ -83,6 +109,10 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl producer = null; + try { + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties); + + // 消息 + String message = "Default Message"; + + // 发送消息到主题 + producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message)); + System.out.println("消息发送成功"); + } catch (Exception e) { + System.err.println("连接失败: " + e.getMessage()); + throw new ServerException("连接失败"); + } finally { + // 关闭生产者 + if (producer != null) { + producer.close(); + } + } + } + + if(vo.getDatabaseType().equals(Judgment.MONGODB.getIndex())){ + // 连接到 MongoDB 本地服务器 + String url = productTypeEnum.getUrl() + .replace("{username}", vo.getUserName()) + .replace("{password}", vo.getPassword()) + .replace("{host}", vo.getDatabaseIp()) + .replace("{port}", vo.getDatabasePort()) + .replace("{database}", vo.getDatabaseName()); + try (MongoClient mongoClient = MongoClients.create(url)) { + // 获取到数据库对象 + MongoDatabase database = mongoClient.getDatabase(vo.getDatabaseName()); + System.out.println("连接数据库成功"); + } catch (Exception e) { + System.err.println("连接MongoDB报错: " + e.getMessage()); + throw new ServerException("连接失败"); + } + } + // + if(vo.getDatabaseType().equals(Judgment.FTP.getIndex())){ + // String server = "ftp.example.com"; + String user = vo.getUserName(); + String password = vo.getPassword(); + + FTPClient ftpClient = new FTPClient(); + + try { + // 连接到 FTP 服务器 + ftpClient.connect(vo.getDatabaseIp(),Integer.valueOf(vo.getDatabasePort())); + ftpClient.login(user, password); + ftpClient.enterLocalPassiveMode(); + + // 打印服务器返回的欢迎信息 + System.out.println("Connected to " + vo.getDatabaseIp() + "."); + System.out.println(ftpClient.getReplyString()); + + // 设置文件传输类型为二进制 + ftpClient.setFileType(FTP.ASCII_FILE_TYPE); + + // 上传文件 + uploadFile(ftpClient, vo.getDatabaseIp(), "D:/desktop/ftp.txt"); + + // 下载文件 +// downloadFile(ftpClient, "C:/Users/栗永斌/Desktop/新建 文本文档.txt", "C:/Users/栗永斌/Desktop/新建 文本文档.txt"); + + // 登出并断开连接 + ftpClient.logout(); + ftpClient.disconnect(); + + } catch (IOException e) { + e.printStackTrace(); + } + } +// + + + } if (vo.getId() != null) { //更新状态 @@ -158,6 +300,39 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl getTablesById(Long id) { DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id); @@ -166,14 +341,33 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl getTables(DataDatabaseEntity dataDatabaseEntity) { ProductTypeEnum productTypeEnum = ProductTypeEnum.getByIndex(dataDatabaseEntity.getDatabaseType()); - IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum); - List tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl() - .replace("{host}", dataDatabaseEntity.getDatabaseIp()) - .replace("{port}", dataDatabaseEntity.getDatabasePort()) - .replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(), - ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName()); - return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); - } + List tableDescriptions = new ArrayList<>(); + try { + switch (productTypeEnum) { + case FTP: + tableDescriptions = ftpService.getFtpFiles(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case ES: + tableDescriptions = esService.getEsTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case MONGODB: + tableDescriptions = mongoService.getMongoTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case REDIS: + tableDescriptions = redisService.getRedisTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); + case KAFKA: + tableDescriptions = kafkaService.getKafkaTables(dataDatabaseEntity); + return BeanUtil.copyListProperties(tableDescriptions, TableVo::new); +// default: +// IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum); +// tableDescriptions = metaDataService.queryTableList(StringUtil.isBlank(dataDatabaseEntity.getJdbcUrl()) ? productTypeEnum.getUrl().replace("{host}", dataDatabaseEntity.getDatabaseIp()).replace("{port}", dataDatabaseEntity.getDatabasePort()).replace("{database}", dataDatabaseEntity.getDatabaseName()) : dataDatabaseEntity.getJdbcUrl(), dataDatabaseEntity.getUserName(), dataDatabaseEntity.getPassword(), ProductTypeEnum.ORACLE.equals(productTypeEnum) ? dataDatabaseEntity.getUserName() : dataDatabaseEntity.getDatabaseName()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } @SneakyThrows @Override diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java new file mode 100644 index 0000000..9d53c44 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/EsService.java @@ -0,0 +1,48 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import net.srt.vo.TableVo; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +@Service +public class EsService { + public List getEsTables(DataDatabaseEntity dataDatabaseEntity) { + // 从数据实体中获取 Elasticsearch 服务器的地址和端口 + String esServerAddress = dataDatabaseEntity.getDatabaseIp(); + int esServerPort = Integer.parseInt(dataDatabaseEntity.getDatabasePort()); + + // 创建 RestHighLevelClient 实例,用于与 Elasticsearch 进行高级 REST 客户端通信 + RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder( + new HttpHost(esServerAddress, esServerPort, "http"))); + try { + // 创建获取索引请求,使用通配符 "*" 表示获取所有索引 + GetIndexRequest getIndexRequest = new GetIndexRequest("*"); + // 发送获取索引请求,获取所有索引的名称 + String[] indexes = client.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices(); + // 创建一个列表用于存储索引信息 + List tableDescriptions = new ArrayList<>(); + // 遍历所有索引名称,创建 TableDescription 对象并添加到列表中 + for (String index : indexes) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(index); + tableDescriptions.add(tableDescription); + } + // 返回 Elasticsearch 索引信息列表 + return tableDescriptions; + } catch (IOException e) { + // 处理 IO 异常 + e.printStackTrace(); + } + return null; + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java new file mode 100644 index 0000000..1269add --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/FtpService.java @@ -0,0 +1,40 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import net.srt.vo.TableVo; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.io.IOException; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; +@Service +public class FtpService { + public List getFtpFiles(DataDatabaseEntity dataDatabaseEntity) { + List tableDescriptions = new ArrayList<>(); + FTPClient ftpClient = new FTPClient(); + + try { + ftpClient.connect(dataDatabaseEntity.getDatabaseIp(),Integer.valueOf(dataDatabaseEntity.getDatabasePort())); + ftpClient.login(dataDatabaseEntity.getUserName(),dataDatabaseEntity.getPassword()); + ftpClient.enterLocalPassiveMode(); + ftpClient.setFileType(FTP.BINARY_FILE_TYPE); + + // 列出FTP服务器上的文件 + FTPFile[] ftpFiles = ftpClient.listFiles(); + for (FTPFile ftpFile : ftpFiles) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(ftpFile.getName()); + tableDescriptions.add(tableDescription); + } + return tableDescriptions; + } catch (IOException e) { + throw new RuntimeException(e); + } + + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java new file mode 100644 index 0000000..2012b92 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/KafkaService.java @@ -0,0 +1,41 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import net.srt.vo.TableVo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.common.KafkaFuture; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +@Service +public class KafkaService { + public List getKafkaTables(DataDatabaseEntity dataDatabaseEntity) { + String bootstrapServers = dataDatabaseEntity.getDatabaseIp() + ":" + dataDatabaseEntity.getDatabasePort(); + Properties properties = new Properties(); + properties.put("bootstrap.servers",bootstrapServers); + properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); + AdminClient adminClient=null; + try { + adminClient = KafkaAdminClient.create(properties); + ListTopicsResult listTopicsResult = adminClient.listTopics(); + KafkaFuture> names = listTopicsResult.names(); + Set strings = names.get(); + List tableDescriptions=strings.stream().map(s -> { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(s); + return tableDescription; + }).collect(Collectors.toList()); + return tableDescriptions; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java new file mode 100644 index 0000000..1cff935 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/MongoService.java @@ -0,0 +1,45 @@ +package net.srt.service.impl; + +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; +import net.srt.entity.DataDatabaseEntity; +import org.springframework.stereotype.Service; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +@Service +public class MongoService { + public List getMongoTables(DataDatabaseEntity dataDatabaseEntity) { + List tableDescriptions = new ArrayList<>(); + try { + MongoCredential credential = MongoCredential.createCredential(dataDatabaseEntity.getUserName(),"admin",dataDatabaseEntity.getPassword().toCharArray()); + ServerAddress serverAddress = new ServerAddress(dataDatabaseEntity.getDatabaseIp(), Integer.valueOf(dataDatabaseEntity.getDatabasePort())); + MongoClientSettings settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings(builder -> + builder.hosts(Arrays.asList(serverAddress))) + .build(); + + MongoClient mongoClient = MongoClients.create(settings); + MongoDatabase database = mongoClient.getDatabase(dataDatabaseEntity.getDatabaseName()); + ArrayList into = database.listCollectionNames().into(new ArrayList<>()); + for (String s : into) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(s); + tableDescriptions.add(tableDescription); + } + return tableDescriptions; + + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java new file mode 100644 index 0000000..4e05cb9 --- /dev/null +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/RedisService.java @@ -0,0 +1,39 @@ +package net.srt.service.impl; + +import net.srt.entity.DataDatabaseEntity; +import org.springframework.stereotype.Service; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisConnectionException; +import srt.cloud.framework.dbswitch.core.model.TableDescription; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Service +public class RedisService { + public List getRedisTables(DataDatabaseEntity dataDatabaseEntity) { + try (Jedis jedis = new Jedis(dataDatabaseEntity.getDatabaseIp(), Integer.parseInt(dataDatabaseEntity.getDatabasePort()))) { + // 获取所有 key + Set keys = jedis.keys("*"); + ArrayList tableDescriptions = new ArrayList<>(); + for (String key : keys) { + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(key); + tableDescriptions.add(tableDescription); + } + return tableDescriptions; + } catch (JedisConnectionException e) { + // 处理连接异常 + throw new RuntimeException("Redis连接异常: " + e.getMessage(), e); + } catch (NumberFormatException e) { + // 处理端口转换异常 + throw new RuntimeException("端口转换异常: " + e.getMessage(), e); + } catch (Exception e) { + // 处理其他异常 + throw new RuntimeException("发生异常: " + e.getMessage(), e); + } + + } +} diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java b/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java index 0d7dc3f..01b9e43 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/vo/DataDatabaseVO.java @@ -79,7 +79,20 @@ public class DataDatabaseVO implements Serializable { private Date updateTime; @Schema(description = "是否是关系型数据库") - private Integer isJdbc; + private String isJdbc; + + @Schema(description = "es索引") + private String databaseIndex; + + @Schema(description = "es文档") + private String databaseDocument; + + + @Schema(description = "kafka主题") + private String databaseKafKa; + + + } diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java b/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java index 4854331..8e72285 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/vo/TableVo.java @@ -13,4 +13,12 @@ public class TableVo { private String tableName; private String remarks; private DBTableType tableType; + + public static TableVo createKafkaTopicVo(String topicName) { + TableVo kafkaTopicVo = new TableVo(); + kafkaTopicVo.setTableName(topicName); + kafkaTopicVo.setRemarks("Kafka Topic"); + kafkaTopicVo.setTableType(null); + return kafkaTopicVo; + } } diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java index e311415..4c305ba 100644 --- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java +++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/common/type/ProductTypeEnum.java @@ -101,11 +101,12 @@ public enum ProductTypeEnum { */ DORIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "jdbc:mysql://{host}:{port}/{database}?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true"), - REDIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), - KAFKA(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), - ES(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), - FTP(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), - MONGODB(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), + REDIS(17, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), + KAFKA(18, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), + ES(19, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""), + FTP(20, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "ftp://example.com"), + MONGODB(21, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "mongodb://{username}:{password}@{host}:{port}/{database}"), + ; private Integer index; diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java index ce28261..ac4d042 100644 --- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java +++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseEsImpl.java @@ -40,7 +40,7 @@ public class DatabaseEsImpl extends DatabaseMysqlImpl implements IDatabaseInterf @Override public ProductTypeEnum getDatabaseType() { - return ProductTypeEnum.DORIS; + return ProductTypeEnum.ES; } @Override diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java index 616e59c..6450b28 100644 --- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java +++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFMongodbmpl.java @@ -40,7 +40,7 @@ public class DatabaseFMongodbmpl extends DatabaseMysqlImpl implements IDatabaseI @Override public ProductTypeEnum getDatabaseType() { - return ProductTypeEnum.DORIS; + return ProductTypeEnum.MONGODB; } @Override diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java index 7559a6c..ceefe09 100644 --- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java +++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseFTPImpl.java @@ -40,7 +40,7 @@ public class DatabaseFTPImpl extends DatabaseMysqlImpl implements IDatabaseInter @Override public ProductTypeEnum getDatabaseType() { - return ProductTypeEnum.DORIS; + return ProductTypeEnum.FTP; } @Override diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java index b513d1b..9f15296 100644 --- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java +++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseKafkaImpl.java @@ -40,7 +40,7 @@ public class DatabaseKafkaImpl extends DatabaseMysqlImpl implements IDatabaseInt @Override public ProductTypeEnum getDatabaseType() { - return ProductTypeEnum.DORIS; + return ProductTypeEnum.KAFKA; } @Override diff --git a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java index 8d65bd8..43a736d 100644 --- a/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java +++ b/srt-cloud-framework/srt-cloud-dbswitch/src/main/java/srt/cloud/framework/dbswitch/core/database/impl/DatabaseRedisImpl.java @@ -50,45 +50,44 @@ public class DatabaseRedisImpl extends DatabaseMysqlImpl implements IDatabaseInt @Override public ProductTypeEnum getDatabaseType() { - return ProductTypeEnum.DORIS; + return ProductTypeEnum.REDIS; } @Override public List queryTableColumnMeta(Connection connection, String schemaName, String tableName) { -// String sql = this.getTableFieldsQuerySQL(schemaName, tableName); -// List columnDescriptions = this.querySelectSqlColumnMeta(connection, sql); -// // 补充一下字段信息,获取的不准 -// String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'"; -// try (PreparedStatement ps = connection.prepareStatement(extraSql); -// ResultSet rs = ps.executeQuery(); -// ) { -// while (rs.next()) { -// String columnName = rs.getString("column_name"); -// String dataType = rs.getString("data_type"); -// String columnSize = rs.getString("column_size"); -// String decimalDigits = rs.getString("decimal_digits"); -// String columnComment = rs.getString("column_comment"); -// if (columnName != null) { -// for (ColumnDescription cd : columnDescriptions) { -// if (columnName.equals(cd.getFieldName())) { -// cd.setFieldTypeName(dataType); -// int csize = columnSize != null ? Integer.parseInt(columnSize) : 0; -// cd.setDisplaySize(csize); -// cd.setPrecisionSize(csize); -// cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0); -// cd.setRemarks(columnComment); -// break; -// } -// } -// } -// } -// } catch (SQLException e) { -// throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e); -// } -// -// return columnDescriptions; - return null; + String sql = this.getTableFieldsQuerySQL(schemaName, tableName); + List columnDescriptions = this.querySelectSqlColumnMeta(connection, sql); + // 补充一下字段信息,获取的不准 + String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'"; + try (PreparedStatement ps = connection.prepareStatement(extraSql); + ResultSet rs = ps.executeQuery(); + ) { + while (rs.next()) { + String columnName = rs.getString("column_name"); + String dataType = rs.getString("data_type"); + String columnSize = rs.getString("column_size"); + String decimalDigits = rs.getString("decimal_digits"); + String columnComment = rs.getString("column_comment"); + if (columnName != null) { + for (ColumnDescription cd : columnDescriptions) { + if (columnName.equals(cd.getFieldName())) { + cd.setFieldTypeName(dataType); + int csize = columnSize != null ? Integer.parseInt(columnSize) : 0; + cd.setDisplaySize(csize); + cd .setPrecisionSize(csize); + cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0); + cd.setRemarks(columnComment); + break; + } + } + } + } + } catch (SQLException e) { + throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e); + } + + return columnDescriptions; } @Override diff --git a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar index d963086..e64813c 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar and b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar differ diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar index 9df30ff..70ec31f 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar differ diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar index f968ee4..a454886 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar differ