es数据源
parent
2c8a023d83
commit
8e70d714e9
|
@ -11,6 +11,21 @@
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
<!-- Elasticsearch Low Level REST Client -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.elasticsearch.client</groupId>
|
||||||
|
<artifactId>elasticsearch-rest-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Elasticsearch High Level REST Client -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.elasticsearch.client</groupId>
|
||||||
|
<artifactId>elasticsearch-rest-high-level-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.kafka</groupId>
|
<groupId>org.apache.kafka</groupId>
|
||||||
<artifactId>kafka-clients</artifactId>
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
|
|
@ -36,9 +36,12 @@ import net.srt.vo.DataDatabaseVO;
|
||||||
import net.srt.vo.SchemaTableDataVo;
|
import net.srt.vo.SchemaTableDataVo;
|
||||||
import net.srt.vo.SqlGenerationVo;
|
import net.srt.vo.SqlGenerationVo;
|
||||||
import net.srt.vo.TableVo;
|
import net.srt.vo.TableVo;
|
||||||
|
import org.apache.http.HttpHost;
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.elasticsearch.client.RestClient;
|
||||||
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import redis.clients.jedis.Jedis;
|
import redis.clients.jedis.Jedis;
|
||||||
|
@ -50,6 +53,7 @@ import srt.cloud.framework.dbswitch.core.model.TableDescription;
|
||||||
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
|
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
|
||||||
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
|
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
@ -68,6 +72,9 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
private final DataAccessDao dataAccessDao;
|
private final DataAccessDao dataAccessDao;
|
||||||
private final DataAccessService dataAccessService;
|
private final DataAccessService dataAccessService;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageResult<DataDatabaseVO> page(DataDatabaseQuery query) {
|
public PageResult<DataDatabaseVO> page(DataDatabaseQuery query) {
|
||||||
IPage<DataDatabaseEntity> page = baseMapper.selectPage(getPage(query), getWrapper(query));
|
IPage<DataDatabaseEntity> page = baseMapper.selectPage(getPage(query), getWrapper(query));
|
||||||
|
@ -161,7 +168,7 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
);
|
);
|
||||||
}else {
|
}else {
|
||||||
//判断是什么类型进行连接
|
//判断是什么类型进行连接
|
||||||
if(vo.getDatabaseType()== Judgment.REDIS.getIndex()){
|
if(vo.getDatabaseType().equals(Judgment.REDIS.getIndex())){
|
||||||
//连接本地的 Redis 服务
|
//连接本地的 Redis 服务
|
||||||
// 创建 Jedis 对象
|
// 创建 Jedis 对象
|
||||||
Jedis jedis = new Jedis(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
|
Jedis jedis = new Jedis(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
|
||||||
|
@ -184,28 +191,43 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(vo.getDatabaseType()==Judgment.KAFKA.getIndex()){
|
if(vo.getDatabaseType().equals(Judgment.KAFKA.getIndex())){
|
||||||
// 配置生产者属性
|
// 配置生产者属性
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
String ip=vo.getDatabaseIp()+":"+vo.getDatabasePort();
|
String ip = "invalid_ip:invalid_port"; // 设置一个无效的IP和端口,以模拟连接失败的情况
|
||||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
|
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
|
||||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
|
||||||
|
|
||||||
// 创建生产者
|
// 创建生产者
|
||||||
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
|
Producer<String, String> producer = null;
|
||||||
|
try {
|
||||||
|
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
|
||||||
|
|
||||||
// 消息
|
// 消息
|
||||||
String message = "Default Message";
|
String message = "Default Message";
|
||||||
|
|
||||||
// 发送消息到主题
|
// 发送消息到主题
|
||||||
try {
|
producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message));
|
||||||
producer.send(new ProducerRecord<String, String>(vo.getDatabaseKafKa(), message));
|
System.out.println("消息发送成功");
|
||||||
System.out.println("消息发送成功");
|
} catch (Exception e) {
|
||||||
} finally {
|
System.err.println("连接失败: " + e.getMessage());
|
||||||
|
} finally {
|
||||||
// 关闭生产者
|
// 关闭生产者
|
||||||
producer.close();
|
if (producer != null) {
|
||||||
}
|
producer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(vo.getDatabaseType().equals(Judgment.ES.getIndex())){
|
||||||
|
// 调用方法连接 Elasticsearch 并执行操作
|
||||||
|
try {
|
||||||
|
getDocument(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
// 处理连接或操作失败的异常
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (vo.getId() != null) {
|
if (vo.getId() != null) {
|
||||||
|
@ -214,6 +236,16 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void getDocument(String host, int port) throws IOException {
|
||||||
|
// 创建 Elasticsearch 客户端
|
||||||
|
try {
|
||||||
|
RestHighLevelClient client = new RestHighLevelClient(
|
||||||
|
RestClient.builder(new HttpHost(host, port, "http")));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<TableVo> getTablesById(Long id) {
|
public List<TableVo> getTablesById(Long id) {
|
||||||
DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id);
|
DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id);
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue