master
肖凡 2024-04-20 09:43:20 +08:00
parent 4bb3f2761d
commit 79e1240de4
4 changed files with 16 additions and 243 deletions

View File

@ -1,135 +0,0 @@
package com.xiaofan.loadcenter.controller;
import com.xiaofan.loadcenter.gateway.cache.GatewayLoadNodeCache;
import com.xiaofan.loadcenter.gateway.cache.GatewayNodeCache;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.xiaofan.loadcenter.common.aliyun.AliYunEcsService;
import com.xiaofan.loadcenter.common.domain.EcsInstanceInfo;
import com.xiaofan.loadcenter.gateway.cache.GatewayNodeScoreCache;
import lombok.extern.log4j.Log4j2;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.xiaofan.loadcenter.common.redis.service.RedisService;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
@Component
@Log4j2
public class LoadCenterController {
//计数器,用于判断是否需要进行扩容
int aa=0;
//计数器,用于记录循环次数
int bb=0;
//注入Redis操作模版
@Autowired
StringRedisTemplate redisTemplate;
//注入Redis服务
@Autowired
private RedisService redisService;
//注入阿里云ECS服务
@Autowired
private AliYunEcsService aliYunEcsService;
@Autowired
private GatewayNodeCache gatewayNodeCache;
@Autowired
private GatewayLoadNodeCache gatewayLoadNodeCache;
@Autowired
private GatewayNodeScoreCache gatewayNodeScoreCache;
/*
* 30
*/
@Scheduled(cron = "0/10 * * * * ?")
public void scheduleECS() throws Exception {
// 创建 OkHttpClient 客户端
OkHttpClient client = new OkHttpClient();
ArrayList<String> ipCacheSet = new ArrayList<>();
// 获取缓存中的 ECS 服务器集合
Map<Object, Double> map = gatewayNodeScoreCache.get();
for (Map.Entry<Object, Double> entry : map.entrySet()) {
ipCacheSet.add(entry.getKey().toString());
}
log.info("共有"+ipCacheSet.size()+"个服务器");
// 遍历每个 ECS 服务器
for (String ip : ipCacheSet) {
// 构建请求 URL
String URL = "http://"+ip+":8080/public/cluster";
Request request = new Request.Builder()
.url(URL)
.get()
.addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)")
.addHeader("Accesstoken", "")
.build();
try {
// 发起 HTTP 请求获取服务器负载信息
Response response = client.newCall(request).execute();
JSONArray jsonArray = JSONArray.parseArray(response.body().string());
JSONObject jsonObject = jsonArray.getJSONObject(0);
JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo");
int connectSize = mqttInfo.getIntValue("connectSize");
// 打印服务器负载信息
log.info("服务器:"+ip+"-车辆连接数:"+connectSize);
//更新服务器负载信息到Redis
// redisService.setCacheZSet("ECS", ip,connectSize);
gatewayNodeScoreCache.put(ip,connectSize);
// 判断是否需要进行扩容
if (connectSize>=5){
aa++;
log.info("服务器:"+ip+"-----"+aa+"=="+ipCacheSet.size()+"就可以扩容");
// 如果所有服务器都需要扩容
if (aa==ipCacheSet.size()){
log.info("服务器个数:"+ipCacheSet.size()+",-----循环第"+aa+"次,相等了,需要扩容");
log.info("执行扩容机制");
// 节点扩容
String instanceId = aliYunEcsService.runInstances();
log.info("扩容的节点ip为" + instanceId);
// log.info("扩容中休眠5秒再返回确保先创建确保查询得到结果-------");
Thread.sleep(5000);
// 查询节点信息
EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId);
// redisService.setCacheZSet("ECS",ecsInstanceInfo.getPublicIpAddress(),0);
gatewayNodeScoreCache.put(ecsInstanceInfo.getPublicIpAddress(),0);
log.info("公网ip存入redis");
aa=0;
}
} else {
aa=0;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

View File

@ -1,104 +0,0 @@
package com.xiaofan.loadcenter.controller;
import com.xiaofan.loadcenter.gateway.cache.GatewayLoadNodeCache;
import com.xiaofan.loadcenter.gateway.cache.GatewayNodeCache;
import com.xiaofan.loadcenter.common.domain.Result;
import com.xiaofan.loadcenter.common.domain.WorkGatewayNode;
import com.xiaofan.loadcenter.gateway.cache.GatewayNodeScoreCache;
import com.xiaofan.loadcenter.common.redis.service.RedisService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
@Log4j2
@RestController
@RequestMapping("car")
public class WorkGatewayNodeController {
@Autowired
private RedisService redisService;
@Autowired
private GatewayNodeCache gatewayNodeCache;
@Autowired
private GatewayLoadNodeCache gatewayLoadNodeCache;
@Autowired
private GatewayNodeScoreCache gatewayNodeScoreCache;
@PostMapping("/workGatewayNode")
public Result getWorkGatewayNode() throws Exception {
// 从缓存中获取节点与权重的映射关系
// Map<Object, Double> map = redisService.getCacheZSetScore("ECS");
Map<Object, Double> map = gatewayNodeScoreCache.get();
// 将节点与权重映射关系转换为节点列表
ArrayList<WorkGatewayNode> nodeIdList = new ArrayList<>();
// 初始化负载节点列表
List<String> loadNodeList = new ArrayList<>();
for (Map.Entry<Object, Double> entry : map.entrySet()) {
WorkGatewayNode workGatewayNode1 = new WorkGatewayNode();
log.info(entry.getKey().toString()+"--"+entry.getValue());
workGatewayNode1.setNodeId(entry.getKey().toString());
workGatewayNode1.setWeight(entry.getValue().intValue());
nodeIdList.add(workGatewayNode1);
}
log.info("-----分割线-----");
System.out.println(nodeIdList);
log.info("-----分割线-----");
// 计算节点权重总和
long count = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum();
// 如果权重总和小于100则对节点进行负载均衡
if (count < 100) {
// 按权重降序排序节点列表
List<WorkGatewayNode> list = nodeIdList.stream()
.sorted((o1, o2) -> o2.getWeight() - o1.getWeight())
.toList();
// 将权重不足100的节点进行均衡负载
int countWeight = 0;
for (long i = count; i < 100; i++) {
WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size());
workGatewayNode.setWeight(workGatewayNode.getWeight() + 1);
}
}
// 根据节点权重,生成负载节点列表
while (nodeIdList.stream().anyMatch(node -> node.getWeight() > 0)) {
for (WorkGatewayNode workGatewayNode : nodeIdList) {
int weight = workGatewayNode.getWeight();
if (weight > 0) {
loadNodeList.add(workGatewayNode.getNodeId());
workGatewayNode.setWeight(weight - 1);
}
}
}
log.info("----------------------------分割线--------------------------");
//统计每个对象出现的次数
Map<Object, Integer> countMap = new HashMap<>();
for (Object obj : loadNodeList) {
countMap.put(obj, countMap.getOrDefault(obj, 0) + 1);
}
// 找出出现次数最少的对象
Object nodeId = null;
int minCount = Integer.MAX_VALUE;
for (Map.Entry<Object, Integer> entry : countMap.entrySet()) {
if (entry.getValue() < minCount) {
minCount = entry.getValue();
nodeId = entry.getKey();
}
}
// 如果有最小数量的对象,则返回最小数量的对象;否则随机选择一个对象
if (nodeId != null) {
return Result.success(nodeId);
} else {
Random random = new Random();
return Result.success(random.nextInt(loadNodeList.size()));
}
}
}

View File

@ -1 +1 @@
package com.xiaofan.loadcenter.gateway.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * @ProjectName: LoadCenter * @PackageName: com.muyu.loadCenter.domain * @Description 网关节点信息 * @Author XiaoFan * @Date 2024/4/18 14:25 * @Version 1.0 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class GatewayNodeInfo { /** * 节点ID */ private String nodeId; /** * 公网IP */ private String publicIdAddress; }
package com.xiaofan.loadcenter.gateway.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * @ProjectName: LoadCenter * @PackageName: com.muyu.loadCenter.domain * @Description 网关节点信息 * @Author XiaoFan * @Date 2024/4/18 14:25 * @Version 1.0 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class GatewayNodeInfo { /** * 节点ID */ private String nodeId; /** * 公网IP */ private String publicIdAddress; /** * 内网ip */ private String privateIdAddress; }

View File

@ -74,7 +74,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
//计算节点权重总和
long count=nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum();
//如果权重总和小于100则对节点进行负载均衡
if (count<100){
if (count < 100){
//按权重降序排序节点列表
List<WorkGatewayNode> list=nodeIdList.stream()
.sorted((o1, o2) -> o2.getWeight()-o1.getWeight())
@ -127,6 +127,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
String loadNodeId = gatewayLoadNodeCache.getBydIndex(seriesLoadIndex);
// 使用获取到的节点ID从缓存中检索具体的网关节点信息
GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId);
// 返回网关节点的公有ID地址
return gatewayNodeInfo.getPublicIdAddress();
}
@ -156,7 +157,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
try {
Response response=client.newCall(request).execute();
//解析响应数据
JSONArray jsonArray = JSONArray.parseArray(response.body().toString());
JSONArray jsonArray = JSONArray.parseArray(response.body().string());
JSONObject jsonObject = jsonArray.getJSONObject(0);
JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo");
int connectSize=mqttInfo.getIntValue("connectSize");
@ -178,8 +179,19 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
//获取新实例信息并将其放入Redis
EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId);
}
GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo();
gatewayNodeInfo.setNodeId(ecsInstanceInfo.getInstanceId());
gatewayNodeInfo.setPublicIdAddress(ecsInstanceInfo.getPublicIpAddress());
gatewayNodeInfo.setPrivateIdAddress(ecsInstanceInfo.getPrivateIpAddress());
gatewayNodeCache.put(ecsInstanceInfo.getPublicIpAddress(),gatewayNodeInfo);
//)修改服务器与在线车辆数据
gatewayNodeScoreCache.put(ecsInstanceInfo.getPublicIpAddress(),0);
log.info("实例id和公网ip存入Redis");
aa=0;
}
}else{
aa=0;
}
}catch (Exception e){
e.printStackTrace();