diff --git a/src/main/java/com/car/config/InitConnectWeight.java b/src/main/java/com/car/config/InitConnectWeight.java index f5054e0..16a8aa2 100644 --- a/src/main/java/com/car/config/InitConnectWeight.java +++ b/src/main/java/com/car/config/InitConnectWeight.java @@ -113,7 +113,16 @@ public class InitConnectWeight implements ApplicationRunner { } log.info("ipList:{}", ipList); - rabbitTemplate.convertAndSend("ip",ipList); + + String topic = null; + + List topicList = new ArrayList<>(); + for (int i = 0; i < ipList.size(); i++) { + topic = ipList.get(i)+"-"+"test"+i; + topicList.add(topic); + } + log.error("topicList:{}", topicList); + rabbitTemplate.convertAndSend("ip",topicList); //网关收集节点 int gatewayNum = ipList.size(); //数据解析结点数量 @@ -249,7 +258,7 @@ public class InitConnectWeight implements ApplicationRunner { .overallLoad(overallLoad) .build(); connectService.LoadCenterAdd(build); - ArrayList weightIpList = new ArrayList<>(); + List weightIpList = new ArrayList<>(); redisTemplate.delete("subscript"); for (int i = 0; i <= max; i++) { @@ -263,10 +272,11 @@ public class InitConnectWeight implements ApplicationRunner { } } } - createTopic(); + createTopic(topicList); // 存入redis redisTemplate.delete("ips"); + for (String ip : weightIpList) { redisTemplate.opsForList().rightPush("ips",ip); } @@ -283,7 +293,7 @@ public class InitConnectWeight implements ApplicationRunner { // .collect(Collectors.toList()); } - public void createTopic() throws ExecutionException, InterruptedException { + public void createTopic(List topicList) throws ExecutionException, InterruptedException { Properties properties = System.getProperties(); properties.put("bootstrap.servers", "localhost:9092"); // 替换为您的Kafka集群地址 @@ -292,32 +302,33 @@ public class InitConnectWeight implements ApplicationRunner { AdminClient adminClient = AdminClient.create(properties); // 定义要创建的主题名称和分区数 - ArrayList topics = new ArrayList<>(); - topics.add("test1"); - topics.add("test2"); + int numPartitions = 8; // 检查主题是否存在并创建新主题 - for (String topic : topics) { + + for (String topic : topicList) { + String[] parts = topic.split("-"); + String suffix = parts[1]; ListTopicsResult listTopicsResult = adminClient.listTopics(); - if (!listTopicsResult.names().get().contains(topic)) { + if (!listTopicsResult.names().get().contains(suffix)) { // 主题不存在,创建新主题 - NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1); + NewTopic newTopic = new NewTopic(suffix, numPartitions, (short) 1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic)); // 等待主题创建完成 try { createTopicsResult.all().get(); - System.out.println("主题创建成功:" + topic); + System.out.println("主题创建成功:" + suffix); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); - System.err.println("主题创建失败:" + topic); + System.err.println("主题创建失败:" + suffix); } } else { // 主题已存在,可以选择跳过或更新配置 - System.out.println("主题已存在:" + topic); + System.out.println("主题已存在:" + suffix); } - createConsumers(topic); + createConsumers(suffix); } } diff --git a/src/main/java/com/car/service/impl/ConnectServiceImpl.java b/src/main/java/com/car/service/impl/ConnectServiceImpl.java index d10dadc..4be7a3d 100644 --- a/src/main/java/com/car/service/impl/ConnectServiceImpl.java +++ b/src/main/java/com/car/service/impl/ConnectServiceImpl.java @@ -75,6 +75,7 @@ public class ConnectServiceImpl implements ConnectService { lock.unlock(); // 释放锁 } } + }