diff --git a/pom.xml b/pom.xml index 02abb5b..a334dde 100644 --- a/pom.xml +++ b/pom.xml @@ -156,7 +156,16 @@ commons-lang3 3.12.0 - + + org.apache.kafka + kafka-clients + 2.8.0 + + + org.springframework.kafka + spring-kafka + 2.8.11 + org.apache.httpcomponents httpclient diff --git a/src/main/java/com/car/config/InitConnectWeight.java b/src/main/java/com/car/config/InitConnectWeight.java index f34468d..f5054e0 100644 --- a/src/main/java/com/car/config/InitConnectWeight.java +++ b/src/main/java/com/car/config/InitConnectWeight.java @@ -13,6 +13,13 @@ import com.car.demos.car.LoadEnterNumber; import com.car.demos.car.Node; import com.car.service.impl.ConnectServiceImpl; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; @@ -25,6 +32,7 @@ import org.springframework.web.client.RestTemplate; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** @@ -50,7 +58,7 @@ public class InitConnectWeight implements ApplicationRunner { @Override - public void run(ApplicationArguments args) { + public void run(ApplicationArguments args) throws ExecutionException, InterruptedException { connectService.loadCenterDel(); connectService.delNode(); @@ -255,7 +263,7 @@ public class InitConnectWeight implements ApplicationRunner { } } } - + createTopic(); // 存入redis redisTemplate.delete("ips"); @@ -274,4 +282,61 @@ public class InitConnectWeight implements ApplicationRunner { // .map(entry -> new ConnectWeight()) // .collect(Collectors.toList()); } + + public void createTopic() throws ExecutionException, InterruptedException { + + Properties properties = System.getProperties(); + properties.put("bootstrap.servers", "localhost:9092"); // 替换为您的Kafka集群地址 + + // 创建AdminClient实例 + AdminClient adminClient = AdminClient.create(properties); + + // 定义要创建的主题名称和分区数 + ArrayList topics = new ArrayList<>(); + topics.add("test1"); + topics.add("test2"); + int numPartitions = 8; + + + // 检查主题是否存在并创建新主题 + for (String topic : topics) { + ListTopicsResult listTopicsResult = adminClient.listTopics(); + if (!listTopicsResult.names().get().contains(topic)) { + // 主题不存在,创建新主题 + NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1); + CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic)); + // 等待主题创建完成 + try { + createTopicsResult.all().get(); + System.out.println("主题创建成功:" + topic); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + System.err.println("主题创建失败:" + topic); + } + } else { + // 主题已存在,可以选择跳过或更新配置 + System.out.println("主题已存在:" + topic); + } + createConsumers(topic); + } + + } + public void createConsumers(String topic) { + // 定义消费者组ID + String groupId = topic; + + // 配置消费者属性 + Properties properties = System.getProperties(); + properties.put("bootstrap.servers", "localhost:9092"); // 替换为您的Kafka集群地址 + properties.put("key.deserializer", StringDeserializer.class.getName()); + properties.put("value.deserializer", StringDeserializer.class.getName()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + // 创建8个消费者实例 + for (int i = 0; i < 8; i++) { + KafkaConsumer consumer = new KafkaConsumer<>(properties); + // 订阅主题 + consumer.subscribe(Collections.singletonList(topic)); + } + } } diff --git a/src/main/java/com/car/service/impl/ConnectServiceImpl.java b/src/main/java/com/car/service/impl/ConnectServiceImpl.java index 80a3119..d10dadc 100644 --- a/src/main/java/com/car/service/impl/ConnectServiceImpl.java +++ b/src/main/java/com/car/service/impl/ConnectServiceImpl.java @@ -12,11 +12,17 @@ import com.car.demos.req.VehicleConnectionReq; import com.car.mapper.ConnerMapper; import com.car.service.ConnectService; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; +import java.util.Collections; import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.ReentrantLock; /** @@ -33,6 +39,7 @@ public class ConnectServiceImpl implements ConnectService { @Autowired private StringRedisTemplate redisTemplate; + @Override public Result getConnect(VehicleConnectionReq vehicleConnectionReq) { // 创建一个ReentrantLock对象 @@ -48,7 +55,10 @@ public class ConnectServiceImpl implements ConnectService { redisTemplate.opsForValue().set("subscript", String.valueOf(count + 1)); } String ip = redisTemplate.opsForList().index("ips", count); - return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1")); + if (ip.equals("47.100.60.64")){ + return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1")); + } + return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test2")); } finally { lock.unlock(); // 释放锁 } @@ -57,13 +67,17 @@ public class ConnectServiceImpl implements ConnectService { try { redisTemplate.opsForValue().set("subscript", String.valueOf(1)); String ip = redisTemplate.opsForList().index("ips", 0); - return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1")); + if (ip.equals("47.100.60.64")){ + return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1")); + } + return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test2")); } finally { lock.unlock(); // 释放锁 } } - } + + /** * 添加负载中心 * @param loadEnterNumber