fix 修复ip+topic

master
rouchen 2024-06-19 22:26:44 +08:00
parent d9d97513fc
commit 49982ce49e
2 changed files with 26 additions and 14 deletions

View File

@ -113,7 +113,16 @@ public class InitConnectWeight implements ApplicationRunner {
} }
log.info("ipList:{}", ipList); log.info("ipList:{}", ipList);
rabbitTemplate.convertAndSend("ip",ipList);
String topic = null;
List<String> topicList = new ArrayList<>();
for (int i = 0; i < ipList.size(); i++) {
topic = ipList.get(i)+"-"+"test"+i;
topicList.add(topic);
}
log.error("topicList:{}", topicList);
rabbitTemplate.convertAndSend("ip",topicList);
//网关收集节点 //网关收集节点
int gatewayNum = ipList.size(); int gatewayNum = ipList.size();
//数据解析结点数量 //数据解析结点数量
@ -249,7 +258,7 @@ public class InitConnectWeight implements ApplicationRunner {
.overallLoad(overallLoad) .overallLoad(overallLoad)
.build(); .build();
connectService.LoadCenterAdd(build); connectService.LoadCenterAdd(build);
ArrayList<String> weightIpList = new ArrayList<>(); List<String> weightIpList = new ArrayList<>();
redisTemplate.delete("subscript"); redisTemplate.delete("subscript");
for (int i = 0; i <= max; i++) { for (int i = 0; i <= max; i++) {
@ -263,10 +272,11 @@ public class InitConnectWeight implements ApplicationRunner {
} }
} }
} }
createTopic(); createTopic(topicList);
// 存入redis // 存入redis
redisTemplate.delete("ips"); redisTemplate.delete("ips");
for (String ip : weightIpList) { for (String ip : weightIpList) {
redisTemplate.opsForList().rightPush("ips",ip); redisTemplate.opsForList().rightPush("ips",ip);
} }
@ -283,7 +293,7 @@ public class InitConnectWeight implements ApplicationRunner {
// .collect(Collectors.toList()); // .collect(Collectors.toList());
} }
public void createTopic() throws ExecutionException, InterruptedException { public void createTopic(List<String> topicList) throws ExecutionException, InterruptedException {
Properties properties = System.getProperties(); Properties properties = System.getProperties();
properties.put("bootstrap.servers", "localhost:9092"); // 替换为您的Kafka集群地址 properties.put("bootstrap.servers", "localhost:9092"); // 替换为您的Kafka集群地址
@ -292,32 +302,33 @@ public class InitConnectWeight implements ApplicationRunner {
AdminClient adminClient = AdminClient.create(properties); AdminClient adminClient = AdminClient.create(properties);
// 定义要创建的主题名称和分区数 // 定义要创建的主题名称和分区数
ArrayList<String> topics = new ArrayList<>();
topics.add("test1");
topics.add("test2");
int numPartitions = 8; int numPartitions = 8;
// 检查主题是否存在并创建新主题 // 检查主题是否存在并创建新主题
for (String topic : topics) {
for (String topic : topicList) {
String[] parts = topic.split("-");
String suffix = parts[1];
ListTopicsResult listTopicsResult = adminClient.listTopics(); ListTopicsResult listTopicsResult = adminClient.listTopics();
if (!listTopicsResult.names().get().contains(topic)) { if (!listTopicsResult.names().get().contains(suffix)) {
// 主题不存在,创建新主题 // 主题不存在,创建新主题
NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1); NewTopic newTopic = new NewTopic(suffix, numPartitions, (short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic)); CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
// 等待主题创建完成 // 等待主题创建完成
try { try {
createTopicsResult.all().get(); createTopicsResult.all().get();
System.out.println("主题创建成功:" + topic); System.out.println("主题创建成功:" + suffix);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
System.err.println("主题创建失败:" + topic); System.err.println("主题创建失败:" + suffix);
} }
} else { } else {
// 主题已存在,可以选择跳过或更新配置 // 主题已存在,可以选择跳过或更新配置
System.out.println("主题已存在:" + topic); System.out.println("主题已存在:" + suffix);
} }
createConsumers(topic); createConsumers(suffix);
} }
} }

View File

@ -75,6 +75,7 @@ public class ConnectServiceImpl implements ConnectService {
lock.unlock(); // 释放锁 lock.unlock(); // 释放锁
} }
} }
} }