es数据源

pull/2/head
jpz 2023-12-12 10:37:40 +08:00
commit bf5cf89c1f
2 changed files with 35 additions and 38 deletions

View File

@ -72,9 +72,6 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
private final DataAccessDao dataAccessDao; private final DataAccessDao dataAccessDao;
private final DataAccessService dataAccessService; private final DataAccessService dataAccessService;
@Override @Override
public PageResult<DataDatabaseVO> page(DataDatabaseQuery query) { public PageResult<DataDatabaseVO> page(DataDatabaseQuery query) {
IPage<DataDatabaseEntity> page = baseMapper.selectPage(getPage(query), getWrapper(query)); IPage<DataDatabaseEntity> page = baseMapper.selectPage(getPage(query), getWrapper(query));
@ -194,54 +191,54 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
if(vo.getDatabaseType().equals(Judgment.KAFKA.getIndex())){ if(vo.getDatabaseType().equals(Judgment.KAFKA.getIndex())){
// 配置生产者属性 // 配置生产者属性
Properties properties = new Properties(); Properties properties = new Properties();
String ip = "invalid_ip:invalid_port"; // 设置一个无效的IP和端口以模拟连接失败的情况 String ip=vo.getDatabaseIp()+":"+vo.getDatabasePort();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
// 创建生产者 // 创建生产者方娇大傻蛋蠢猪大傻春
Producer<String, String> producer = null; Producer<String, String> producer = null;
try { try {
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties); producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
// 消息 // 消息
String message = "Default Message"; String message = "Default Message";
// 发送消息到主题 // 发送消息到主题
producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message)); producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message));
System.out.println("消息发送成功"); System.out.println("消息发送成功");
} catch (Exception e) { } catch (Exception e) {
System.err.println("连接失败: " + e.getMessage()); System.err.println("连接失败: " + e.getMessage());
} finally { } finally {
// 关闭生产者 // 关闭生产者
if (producer != null) { if (producer != null) {
producer.close(); producer.close();
} }
} }
}
if(vo.getDatabaseType().equals(Judgment.ES.getIndex())){
// 调用方法连接 Elasticsearch 并执行操作
try {
getDocument(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
} catch (IOException e) {
// 处理连接或操作失败的异常
e.printStackTrace();
}
} }
} }
if(vo.getDatabaseType().equals(Judgment.ES.getIndex())){
// 调用方法连接 Elasticsearch 并执行操作
try {
getDocument(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
} catch (IOException e) {
// 处理连接或操作失败的异常
e.printStackTrace();
}
}
if (vo.getId() != null) { if (vo.getId() != null) {
//更新状态 //更新状态
baseMapper.changeStatusById(vo.getId(), YesOrNo.YES.getValue()); baseMapper.changeStatusById(vo.getId(), YesOrNo.YES.getValue());
} }
} }
public static void getDocument(String host, int port) throws IOException { public static void getDocument(String host, int port) throws IOException {
// 创建 Elasticsearch 客户端 // 创建 Elasticsearch 客户端
try { try {
RestHighLevelClient client = new RestHighLevelClient( RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, "http"))); RestClient.builder(new HttpHost(host, port, "http")));
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }

View File

@ -75,7 +75,7 @@ public class DatabaseRedisImpl extends DatabaseMysqlImpl implements IDatabaseInt
cd.setFieldTypeName(dataType); cd.setFieldTypeName(dataType);
int csize = columnSize != null ? Integer.parseInt(columnSize) : 0; int csize = columnSize != null ? Integer.parseInt(columnSize) : 0;
cd.setDisplaySize(csize); cd.setDisplaySize(csize);
cd.setPrecisionSize(csize); cd .setPrecisionSize(csize);
cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0); cd.setScaleSize(decimalDigits != null ? Integer.parseInt(decimalDigits) : 0);
cd.setRemarks(columnComment); cd.setRemarks(columnComment);
break; break;