fix 重构负载中心 创建kafka的8个分区 以及 主题下的8个消费者

master
rouchen 2024-06-18 21:33:05 +08:00
parent 69fb2a92b1
commit d9d97513fc
3 changed files with 94 additions and 6 deletions

11
pom.xml
View File

@ -156,7 +156,16 @@
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.12.0</version> <version>3.12.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.httpcomponents</groupId> <groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId> <artifactId>httpclient</artifactId>

View File

@ -13,6 +13,13 @@ import com.car.demos.car.LoadEnterNumber;
import com.car.demos.car.Node; import com.car.demos.car.Node;
import com.car.service.impl.ConnectServiceImpl; import com.car.service.impl.ConnectServiceImpl;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@ -25,6 +32,7 @@ import org.springframework.web.client.RestTemplate;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -50,7 +58,7 @@ public class InitConnectWeight implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) throws ExecutionException, InterruptedException {
connectService.loadCenterDel(); connectService.loadCenterDel();
connectService.delNode(); connectService.delNode();
@ -255,7 +263,7 @@ public class InitConnectWeight implements ApplicationRunner {
} }
} }
} }
createTopic();
// 存入redis // 存入redis
redisTemplate.delete("ips"); redisTemplate.delete("ips");
@ -274,4 +282,61 @@ public class InitConnectWeight implements ApplicationRunner {
// .map(entry -> new ConnectWeight()) // .map(entry -> new ConnectWeight())
// .collect(Collectors.toList()); // .collect(Collectors.toList());
} }
public void createTopic() throws ExecutionException, InterruptedException {
Properties properties = System.getProperties();
properties.put("bootstrap.servers", "localhost:9092"); // 替换为您的Kafka集群地址
// 创建AdminClient实例
AdminClient adminClient = AdminClient.create(properties);
// 定义要创建的主题名称和分区数
ArrayList<String> topics = new ArrayList<>();
topics.add("test1");
topics.add("test2");
int numPartitions = 8;
// 检查主题是否存在并创建新主题
for (String topic : topics) {
ListTopicsResult listTopicsResult = adminClient.listTopics();
if (!listTopicsResult.names().get().contains(topic)) {
// 主题不存在,创建新主题
NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
// 等待主题创建完成
try {
createTopicsResult.all().get();
System.out.println("主题创建成功:" + topic);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
System.err.println("主题创建失败:" + topic);
}
} else {
// 主题已存在,可以选择跳过或更新配置
System.out.println("主题已存在:" + topic);
}
createConsumers(topic);
}
}
public void createConsumers(String topic) {
// 定义消费者组ID
String groupId = topic;
// 配置消费者属性
Properties properties = System.getProperties();
properties.put("bootstrap.servers", "localhost:9092"); // 替换为您的Kafka集群地址
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 创建8个消费者实例
for (int i = 0; i < 8; i++) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList(topic));
}
}
} }

View File

@ -12,11 +12,17 @@ import com.car.demos.req.VehicleConnectionReq;
import com.car.mapper.ConnerMapper; import com.car.mapper.ConnerMapper;
import com.car.service.ConnectService; import com.car.service.ConnectService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
@ -33,6 +39,7 @@ public class ConnectServiceImpl implements ConnectService {
@Autowired @Autowired
private StringRedisTemplate redisTemplate; private StringRedisTemplate redisTemplate;
@Override @Override
public Result<MqttServerModel> getConnect(VehicleConnectionReq vehicleConnectionReq) { public Result<MqttServerModel> getConnect(VehicleConnectionReq vehicleConnectionReq) {
// 创建一个ReentrantLock对象 // 创建一个ReentrantLock对象
@ -48,7 +55,10 @@ public class ConnectServiceImpl implements ConnectService {
redisTemplate.opsForValue().set("subscript", String.valueOf(count + 1)); redisTemplate.opsForValue().set("subscript", String.valueOf(count + 1));
} }
String ip = redisTemplate.opsForList().index("ips", count); String ip = redisTemplate.opsForList().index("ips", count);
if (ip.equals("47.100.60.64")){
return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1")); return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1"));
}
return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test2"));
} finally { } finally {
lock.unlock(); // 释放锁 lock.unlock(); // 释放锁
} }
@ -57,13 +67,17 @@ public class ConnectServiceImpl implements ConnectService {
try { try {
redisTemplate.opsForValue().set("subscript", String.valueOf(1)); redisTemplate.opsForValue().set("subscript", String.valueOf(1));
String ip = redisTemplate.opsForList().index("ips", 0); String ip = redisTemplate.opsForList().index("ips", 0);
if (ip.equals("47.100.60.64")){
return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1")); return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test1"));
}
return Result.success(new MqttServerModel("tcp://" + ip + ":1883", "test2"));
} finally { } finally {
lock.unlock(); // 释放锁 lock.unlock(); // 释放锁
} }
} }
} }
/** /**
* *
* @param loadEnterNumber * @param loadEnterNumber