数据源
commit
b33b9fe8ab
|
@ -10,6 +10,19 @@
|
|||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.0</version> <!-- 检查是否有更新的版本 -->
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>redis.clients</groupId>
|
||||
<artifactId>jedis</artifactId>
|
||||
<version>2.9.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.srt</groupId>
|
||||
<artifactId>srt-cloud-api</artifactId>
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
package net.srt.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum IsJdbc {
|
||||
YES("是"),
|
||||
NO("否");
|
||||
|
||||
private String value;
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package net.srt.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum Judgment {
|
||||
REDIS(17),KAFKA(18),ES(19),FTP(20),MONGODB(21);
|
||||
|
||||
private Integer index;
|
||||
|
||||
}
|
|
@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil;
|
|||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
|
||||
|
@ -18,6 +19,8 @@ import net.srt.dao.DataDatabaseDao;
|
|||
import net.srt.dto.SqlConsole;
|
||||
import net.srt.entity.DataAccessEntity;
|
||||
import net.srt.entity.DataDatabaseEntity;
|
||||
import net.srt.enums.IsJdbc;
|
||||
import net.srt.enums.Judgment;
|
||||
import net.srt.framework.common.cache.bean.DataProjectCacheBean;
|
||||
import net.srt.framework.common.exception.ServerException;
|
||||
import net.srt.framework.common.page.PageResult;
|
||||
|
@ -33,8 +36,12 @@ import net.srt.vo.DataDatabaseVO;
|
|||
import net.srt.vo.SchemaTableDataVo;
|
||||
import net.srt.vo.SqlGenerationVo;
|
||||
import net.srt.vo.TableVo;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
|
||||
import srt.cloud.framework.dbswitch.common.util.StringUtil;
|
||||
import srt.cloud.framework.dbswitch.core.model.ColumnDescription;
|
||||
|
@ -45,6 +52,7 @@ import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -137,12 +145,14 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
|||
|
||||
IMetaDataByJdbcService metaDataService = new MetaDataByJdbcServiceImpl(productTypeEnum);
|
||||
if (StringUtil.isBlank(vo.getJdbcUrl())) {
|
||||
vo.setJdbcUrl(productTypeEnum.getUrl()
|
||||
.replace("{host}", vo.getDatabaseIp())
|
||||
.replace("{port}", vo.getDatabasePort())
|
||||
.replace("{database}", vo.getDatabaseName()));
|
||||
if(productTypeEnum.getUrl()!=null){
|
||||
vo.setJdbcUrl(productTypeEnum.getUrl()
|
||||
.replace("{host}", vo.getDatabaseIp())
|
||||
.replace("{port}", vo.getDatabasePort())
|
||||
.replace("{database}", vo.getDatabaseName()));
|
||||
}
|
||||
}
|
||||
if (vo.getIsJdbc()==1){
|
||||
if (vo.getIsJdbc()== IsJdbc.NO.getValue()){
|
||||
metaDataService.testQuerySQL(
|
||||
vo.getJdbcUrl(),
|
||||
vo.getUserName(),
|
||||
|
@ -150,7 +160,53 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
|||
productTypeEnum.getTestSql()
|
||||
);
|
||||
}else {
|
||||
//TODO 连接
|
||||
//判断是什么类型进行连接
|
||||
if(vo.getDatabaseType()== Judgment.REDIS.getIndex()){
|
||||
//连接本地的 Redis 服务
|
||||
// 创建 Jedis 对象
|
||||
Jedis jedis = new Jedis(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
|
||||
try {
|
||||
// 尝试连接到 Redis 服务器
|
||||
jedis.connect();
|
||||
|
||||
// 检查是否连接成功
|
||||
if (jedis.isConnected()) {
|
||||
System.out.println("redis连接成功");
|
||||
} else {
|
||||
System.out.println("redis连接失败");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 捕获连接异常
|
||||
System.err.println("redis异常" + e.getMessage());
|
||||
} finally {
|
||||
// 关闭连接
|
||||
jedis.close();
|
||||
}
|
||||
}
|
||||
|
||||
if(vo.getDatabaseType()==Judgment.KAFKA.getIndex()){
|
||||
// 配置生产者属性
|
||||
Properties properties = new Properties();
|
||||
String ip=vo.getDatabaseIp()+":"+vo.getDatabasePort();
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||
|
||||
// 创建生产者
|
||||
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
|
||||
|
||||
// 消息
|
||||
String message = "Default Message";
|
||||
|
||||
// 发送消息到主题
|
||||
try {
|
||||
producer.send(new ProducerRecord<String, String>(vo.getDatabaseKafKa(), message));
|
||||
System.out.println("消息发送成功");
|
||||
} finally {
|
||||
// 关闭生产者
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (vo.getId() != null) {
|
||||
//更新状态
|
||||
|
|
|
@ -79,7 +79,20 @@ public class DataDatabaseVO implements Serializable {
|
|||
private Date updateTime;
|
||||
|
||||
@Schema(description = "是否是关系型数据库")
|
||||
private Integer isJdbc;
|
||||
private String isJdbc;
|
||||
|
||||
@Schema(description = "es索引")
|
||||
private String databaseIndex;
|
||||
|
||||
@Schema(description = "es文档")
|
||||
private String databaseDocument;
|
||||
|
||||
|
||||
@Schema(description = "kafka主题")
|
||||
private String databaseKafKa;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -101,11 +101,11 @@ public enum ProductTypeEnum {
|
|||
*/
|
||||
DORIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", "jdbc:mysql://{host}:{port}/{database}?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true"),
|
||||
|
||||
REDIS(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
KAFKA(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
ES(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
FTP(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
MONGODB(16, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
REDIS(17, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
KAFKA(18, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
ES(19, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
FTP(20, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
MONGODB(21, "com.mysql.jdbc.Driver","/* ping */ SELECT 1", ""),
|
||||
;
|
||||
|
||||
private Integer index;
|
||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseEsImpl extends DatabaseMysqlImpl implements IDatabaseInterf
|
|||
|
||||
@Override
|
||||
public ProductTypeEnum getDatabaseType() {
|
||||
return ProductTypeEnum.DORIS;
|
||||
return ProductTypeEnum.ES;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseFMongodbmpl extends DatabaseMysqlImpl implements IDatabaseI
|
|||
|
||||
@Override
|
||||
public ProductTypeEnum getDatabaseType() {
|
||||
return ProductTypeEnum.DORIS;
|
||||
return ProductTypeEnum.MONGODB;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseFTPImpl extends DatabaseMysqlImpl implements IDatabaseInter
|
|||
|
||||
@Override
|
||||
public ProductTypeEnum getDatabaseType() {
|
||||
return ProductTypeEnum.DORIS;
|
||||
return ProductTypeEnum.FTP;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,7 +40,7 @@ public class DatabaseKafkaImpl extends DatabaseMysqlImpl implements IDatabaseInt
|
|||
|
||||
@Override
|
||||
public ProductTypeEnum getDatabaseType() {
|
||||
return ProductTypeEnum.DORIS;
|
||||
return ProductTypeEnum.KAFKA;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -50,45 +50,44 @@ public class DatabaseRedisImpl extends DatabaseMysqlImpl implements IDatabaseInt
|
|||
|
||||
@Override
|
||||
public ProductTypeEnum getDatabaseType() {
|
||||
return ProductTypeEnum.DORIS;
|
||||
return ProductTypeEnum.REDIS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName,
|
||||
String tableName) {
|
||||
// String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
|
||||
// List<ColumnDescription> columnDescriptions = this.querySelectSqlColumnMeta(connection, sql);
|
||||
// // 补充一下字段信息,获取的不准
|
||||
// String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'";
|
||||
// try (PreparedStatement ps = connection.prepareStatement(extraSql);
|
||||
// ResultSet rs = ps.executeQuery();
|
||||
// ) {
|
||||
// while (rs.next()) {
|
||||
// String columnName = rs.getString("column_name");
|
||||
// String dataType = rs.getString("data_type");
|
||||
// String columnSize = rs.getString("column_size");
|
||||
// String decimalDigits = rs.getString("decimal_digits");
|
||||
// String columnComment = rs.getString("column_comment");
|
||||
// if (columnName != null) {
|
||||
// for (ColumnDescription cd : columnDescriptions) {
|
||||
// if (columnName.equals(cd.getFieldName())) {
|
||||
// cd.setFieldTypeName(dataType);
|
||||
// int csize = columnSize != null ? Integer.parseInt(columnSize) : 0;
|
||||
// cd.setDisplaySize(csize);
|
||||
// cd.setPrecisionSize(csize);
|
||||
// cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0);
|
||||
// cd.setRemarks(columnComment);
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// } catch (SQLException e) {
|
||||
// throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e);
|
||||
// }
|
||||
//
|
||||
// return columnDescriptions;
|
||||
return null;
|
||||
String sql = this.getTableFieldsQuerySQL(schemaName, tableName);
|
||||
List<ColumnDescription> columnDescriptions = this.querySelectSqlColumnMeta(connection, sql);
|
||||
// 补充一下字段信息,获取的不准
|
||||
String extraSql = "SELECT column_name,data_type,column_size,decimal_digits,column_comment FROM information_schema.COLUMNS WHERE table_schema='" + schemaName + "' AND table_name='" + tableName + "'";
|
||||
try (PreparedStatement ps = connection.prepareStatement(extraSql);
|
||||
ResultSet rs = ps.executeQuery();
|
||||
) {
|
||||
while (rs.next()) {
|
||||
String columnName = rs.getString("column_name");
|
||||
String dataType = rs.getString("data_type");
|
||||
String columnSize = rs.getString("column_size");
|
||||
String decimalDigits = rs.getString("decimal_digits");
|
||||
String columnComment = rs.getString("column_comment");
|
||||
if (columnName != null) {
|
||||
for (ColumnDescription cd : columnDescriptions) {
|
||||
if (columnName.equals(cd.getFieldName())) {
|
||||
cd.setFieldTypeName(dataType);
|
||||
int csize = columnSize != null ? Integer.parseInt(columnSize) : 0;
|
||||
cd.setDisplaySize(csize);
|
||||
cd.setPrecisionSize(csize);
|
||||
cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0);
|
||||
cd.setRemarks(columnComment);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException(schemaName + "." + tableName + " queryTableColumnMeta error!!", e);
|
||||
}
|
||||
|
||||
return columnDescriptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue