数据源

pull/2/head
chenbingxuan 2023-12-11 09:28:33 +08:00
parent 2c8a023d83
commit 8e05fd0d30
1 changed files with 18 additions and 13 deletions

View File

@ -161,7 +161,7 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
); );
}else { }else {
//判断是什么类型进行连接 //判断是什么类型进行连接
if(vo.getDatabaseType()== Judgment.REDIS.getIndex()){ if(vo.getDatabaseType().equals(Judgment.REDIS.getIndex())){
//连接本地的 Redis 服务 //连接本地的 Redis 服务
// 创建 Jedis 对象 // 创建 Jedis 对象
Jedis jedis = new Jedis(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort())); Jedis jedis = new Jedis(vo.getDatabaseIp(), Integer.valueOf(vo.getDatabasePort()));
@ -184,28 +184,33 @@ public class DataDatabaseServiceImpl extends BaseServiceImpl<DataDatabaseDao, Da
} }
} }
if(vo.getDatabaseType()==Judgment.KAFKA.getIndex()){ if(vo.getDatabaseType().equals(Judgment.KAFKA.getIndex())){
// 配置生产者属性 // 配置生产者属性
Properties properties = new Properties(); Properties properties = new Properties();
String ip=vo.getDatabaseIp()+":"+vo.getDatabasePort(); String ip = "invalid_ip:invalid_port"; // 设置一个无效的IP和端口以模拟连接失败的情况
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
// 创建生产者 // 创建生产者
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties); Producer<String, String> producer = null;
try {
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
// 消息 // 消息
String message = "Default Message"; String message = "Default Message";
// 发送消息到主题 // 发送消息到主题
try { producer.send(new ProducerRecord<>(vo.getDatabaseKafKa(), message));
producer.send(new ProducerRecord<String, String>(vo.getDatabaseKafKa(), message)); System.out.println("消息发送成功");
System.out.println("消息发送成功"); } catch (Exception e) {
} finally { System.err.println("连接失败: " + e.getMessage());
} finally {
// 关闭生产者 // 关闭生产者
producer.close(); if (producer != null) {
} producer.close();
}
}
} }
} }
if (vo.getId() != null) { if (vo.getId() != null) {