diff --git a/srt-cloud-data-integrate/pom.xml b/srt-cloud-data-integrate/pom.xml index f2c28f0..e3f7e89 100644 --- a/srt-cloud-data-integrate/pom.xml +++ b/srt-cloud-data-integrate/pom.xml @@ -11,6 +11,21 @@ + + + + org.elasticsearch.client + elasticsearch-rest-client + + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + + + + org.apache.kafka kafka-clients diff --git a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java index 8ef19b6..d1c1869 100644 --- a/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java +++ b/srt-cloud-data-integrate/src/main/java/net/srt/service/impl/DataDatabaseServiceImpl.java @@ -36,9 +36,12 @@ import net.srt.vo.DataDatabaseVO; import net.srt.vo.SchemaTableDataVo; import net.srt.vo.SqlGenerationVo; import net.srt.vo.TableVo; +import org.apache.http.HttpHost; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import redis.clients.jedis.Jedis; @@ -50,6 +53,7 @@ import srt.cloud.framework.dbswitch.core.model.TableDescription; import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService; import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -68,6 +72,9 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl page(DataDatabaseQuery query) { IPage page = baseMapper.selectPage(getPage(query), getWrapper(query)); @@ -161,7 +168,7 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties); + Producer producer = null; + try { + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties); - // 消息 - String message = "Default Message"; + // 消息 + String message = "Default Message"; - // 发送消息到主题 - try { - producer.send(new ProducerRecord(vo.getDatabaseKafKa(), message)); - System.out.println("消息发送成功"); - } finally { + // 发送消息到主题 + producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message)); + System.out.println("消息发送成功"); + } catch (Exception e) { + System.err.println("连接失败: " + e.getMessage()); + } finally { // 关闭生产者 - producer.close(); - } + if (producer != null) { + producer.close(); + } + } + } + + if(vo.getDatabaseType().equals(Judgment.ES.getIndex())){ + // 调用方法连接 Elasticsearch 并执行操作 + try { + getDocument(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort())); + } catch (IOException e) { + // 处理连接或操作失败的异常 + e.printStackTrace(); + } } } if (vo.getId() != null) { @@ -214,6 +236,16 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl getTablesById(Long id) { DataDatabaseEntity dataDatabaseEntity = baseMapper.selectById(id); diff --git a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar index d963086..41eeef6 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar and b/srt-cloud-framework/srt-cloud-flink/build/app/flink-app-1.14-2.0.0-jar-with-dependencies.jar differ diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar index 9df30ff..cc95a4b 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-catalog-mysql-1.14-2.0.0.jar differ diff --git a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar index f968ee4..2cae86b 100644 Binary files a/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar and b/srt-cloud-framework/srt-cloud-flink/build/extends/flink-client-1.14-2.0.0.jar differ