MongoDB
parent
bf5cf89c1f
commit
c28ce8fa92
|
@ -11,6 +11,12 @@
|
|||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
<version>4.4.0</version> <!-- 使用最新版本 -->
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Elasticsearch Low Level REST Client -->
|
||||
<dependency>
|
||||
|
|
|
@ -5,6 +5,9 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
|
||||
|
@ -36,12 +39,9 @@ import net.srt.vo.DataDatabaseVO;
|
|||
import net.srt.vo.SchemaTableDataVo;
|
||||
import net.srt.vo.SqlGenerationVo;
|
||||
import net.srt.vo.TableVo;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import redis.clients.jedis.Jedis;
|
||||
|
@ -53,7 +53,6 @@ import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
|||
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
|
||||
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
@ -191,58 +190,57 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
|||
if(vo.getDatabaseType().equals(Judgment.KAFKA.getIndex())){
|
||||
// 配置生产者属性
|
||||
Properties properties = new Properties();
|
||||
String ip=vo.getDatabaseIp()+":"+vo.getDatabasePort();
|
||||
String ip = "invalid_ip:invalid_port"; // 设置一个无效的IP和端口,以模拟连接失败的情况
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||
|
||||
// 创建生产者方娇大傻蛋蠢猪大傻春
|
||||
Producer<String, String> producer = null;
|
||||
try {
|
||||
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
|
||||
// 创建生产者方娇大傻蛋蠢猪大傻春
|
||||
Producer<String, String> producer = null;
|
||||
try {
|
||||
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
|
||||
|
||||
// 消息
|
||||
String message = "Default Message";
|
||||
// 消息
|
||||
String message = "Default Message";
|
||||
|
||||
// 发送消息到主题
|
||||
producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message));
|
||||
System.out.println("消息发送成功");
|
||||
} catch (Exception e) {
|
||||
System.err.println("连接失败: " + e.getMessage());
|
||||
} finally {
|
||||
// 关闭生产者
|
||||
if (producer != null) {
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
// 发送消息到主题
|
||||
producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message));
|
||||
System.out.println("消息发送成功");
|
||||
} catch (Exception e) {
|
||||
System.err.println("连接失败: " + e.getMessage());
|
||||
} finally {
|
||||
// 关闭生产者
|
||||
if (producer != null) {
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(vo.getDatabaseType().equals(Judgment.ES.getIndex())){
|
||||
// 调用方法连接 Elasticsearch 并执行操作
|
||||
try {
|
||||
getDocument(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
|
||||
} catch (IOException e) {
|
||||
// 处理连接或操作失败的异常
|
||||
e.printStackTrace();
|
||||
if(vo.getDatabaseType().equals(Judgment.MONGODB.getIndex())){
|
||||
// 连接到 MongoDB 本地服务器
|
||||
String url = productTypeEnum.getUrl()
|
||||
.replace("{username}", vo.getUserName())
|
||||
.replace("{password}", vo.getPassword())
|
||||
.replace("{host}", vo.getDatabaseIp())
|
||||
.replace("{port}", vo.getDatabasePort())
|
||||
.replace("{database}", vo.getDatabaseName());
|
||||
try (MongoClient mongoClient = MongoClients.create(url)) {
|
||||
// 获取到数据库对象
|
||||
MongoDatabase database = mongoClient.getDatabase(vo.getDatabaseName());
|
||||
System.out.println("连接数据库成功");
|
||||
} catch (Exception e) {
|
||||
System.err.println("连接MongoDB报错: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
if (vo.getId() != null) {
|
||||
//更新状态
|
||||
baseMapper.changeStatusById(vo.getId(), YesOrNo.YES.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
public static void getDocument(String host, int port) throws IOException {
|
||||
// 创建 Elasticsearch 客户端
|
||||
try {
|
||||
RestHighLevelClient client = new RestHighLevelClient(
|
||||
RestClient.builder(new HttpHost(host, port, "http")));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TableVo> getTablesById(Long id) {
|
||||
DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id);
|
||||
|
|
Loading…
Reference in New Issue