From b8a0bb1eec0b6e56f2c86a8190a980f1e608ef5c Mon Sep 17 00:00:00 2001 From: liuyunhu <3286117488@qq.com> Date: Fri, 19 Apr 2024 10:26:58 +0800 Subject: [PATCH] +++++++++++++ --- .../aliyun/service/AliYunEcsService.java | 1 - .../com/lyh/common/utils/mqtt/MqttUtil.java | 5 +- .../lyh/gateway/cache/GatewayNodeIdCache.java | 53 +++ .../gateway/cache/GatewayNodeInfoCache.java | 4 +- ...deIpCache.java => GatewayNodeIpCache.java} | 4 +- src/main/java/com/lyh/handle/HandleCache.java | 343 ++++++++++++++++++ src/main/java/com/lyh/job/Timer.java | 139 ++++--- .../service/impl/LoadCenterServiceImpl.java | 290 +-------------- src/test/java/Test.java | 8 +- 9 files changed, 508 insertions(+), 339 deletions(-) create mode 100644 src/main/java/com/lyh/gateway/cache/GatewayNodeIdCache.java rename src/main/java/com/lyh/gateway/cache/{GatewayLoadNodeIpCache.java => GatewayNodeIpCache.java} (89%) create mode 100644 src/main/java/com/lyh/handle/HandleCache.java diff --git a/src/main/java/com/lyh/common/aliyun/service/AliYunEcsService.java b/src/main/java/com/lyh/common/aliyun/service/AliYunEcsService.java index 1428b5e..d3a4d13 100644 --- a/src/main/java/com/lyh/common/aliyun/service/AliYunEcsService.java +++ b/src/main/java/com/lyh/common/aliyun/service/AliYunEcsService.java @@ -60,7 +60,6 @@ public class AliYunEcsService { **/ public List getIDList() throws Exception { - java.util.List regionIds = com.aliyun.darabonbastring.Client.split(aliConfig.getRegionId(), ",", 50); String regionId = regionIds.get(0); diff --git a/src/main/java/com/lyh/common/utils/mqtt/MqttUtil.java b/src/main/java/com/lyh/common/utils/mqtt/MqttUtil.java index d111ee4..27d9125 100644 --- a/src/main/java/com/lyh/common/utils/mqtt/MqttUtil.java +++ b/src/main/java/com/lyh/common/utils/mqtt/MqttUtil.java @@ -2,6 +2,7 @@ package com.lyh.common.utils.mqtt; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.lyh.gateway.mode.IpAndLoadCount; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -28,7 +29,7 @@ public class MqttUtil { * @Param: [IP] * @Return: int **/ - public int getFetchLoad(String ip) { + public IpAndLoadCount getFetchLoad(String ip) { int result = 0; @@ -61,6 +62,6 @@ public class MqttUtil { log.error(e.getMessage()); } - return result; + return new IpAndLoadCount(ip, result); } } diff --git a/src/main/java/com/lyh/gateway/cache/GatewayNodeIdCache.java b/src/main/java/com/lyh/gateway/cache/GatewayNodeIdCache.java new file mode 100644 index 0000000..7e6f397 --- /dev/null +++ b/src/main/java/com/lyh/gateway/cache/GatewayNodeIdCache.java @@ -0,0 +1,53 @@ +package com.lyh.gateway.cache; + +import com.lyh.gateway.cache.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @ProjectName: LoadCenter + * @Author: LiuYunHu + * @CreateTime: 2024/4/19 + * @Description: 节点id缓存 + */ +@Component +public class GatewayNodeIdCache extends GatewayCacheAbs { + private final static String gatewayLoadNodeIdKey = "NodeId"; + + @Override + public String getPre() { + return "gateway:load:"; + } + + /* + * @Description: 增加缓存数据 + * @Date: 2024/4/18 21:37 + * @Param: [gatewayNodeIps] + * @Return: void + **/ + public void put(List gatewayNodeIds) { + this.remove(); + redisService.setCacheList(encode(gatewayLoadNodeIdKey), gatewayNodeIds); + } + + /* + * @Description: 获取缓存数据 + * @Date: 2024/4/18 21:37 + * @Param: [] + * @Return: java.util.List + **/ + public List get() { + return redisService.getCacheList(encode(gatewayLoadNodeIdKey)); + } + + /* + * @Description: 删除缓存数据 + * @Date: 2024/4/18 21:37 + * @Param: [] + * @Return: void + **/ + public void remove() { + redisService.deleteObject(encode(gatewayLoadNodeIdKey)); + } +} diff --git a/src/main/java/com/lyh/gateway/cache/GatewayNodeInfoCache.java b/src/main/java/com/lyh/gateway/cache/GatewayNodeInfoCache.java index a720c8d..18f305d 100644 --- a/src/main/java/com/lyh/gateway/cache/GatewayNodeInfoCache.java +++ b/src/main/java/com/lyh/gateway/cache/GatewayNodeInfoCache.java @@ -17,11 +17,11 @@ import java.util.List; @Component public class GatewayNodeInfoCache extends GatewayCacheAbs { //redis Key - private final static String gatewayLoadInfoKey = "info"; + private final static String gatewayLoadInfoKey = "NodeInfo"; @Override public String getPre() { - return "gateway:node:"; + return "gateway:load:"; } /* diff --git a/src/main/java/com/lyh/gateway/cache/GatewayLoadNodeIpCache.java b/src/main/java/com/lyh/gateway/cache/GatewayNodeIpCache.java similarity index 89% rename from src/main/java/com/lyh/gateway/cache/GatewayLoadNodeIpCache.java rename to src/main/java/com/lyh/gateway/cache/GatewayNodeIpCache.java index 4b0a3c3..ff90080 100644 --- a/src/main/java/com/lyh/gateway/cache/GatewayLoadNodeIpCache.java +++ b/src/main/java/com/lyh/gateway/cache/GatewayNodeIpCache.java @@ -12,8 +12,8 @@ import java.util.List; * @Description: 负载节点IP缓存 */ @Component -public class GatewayLoadNodeIpCache extends GatewayCacheAbs { - private final static String gatewayLoadNodeIpKey = "ip"; +public class GatewayNodeIpCache extends GatewayCacheAbs { + private final static String gatewayLoadNodeIpKey = "NodeIp"; @Override public String getPre() { diff --git a/src/main/java/com/lyh/handle/HandleCache.java b/src/main/java/com/lyh/handle/HandleCache.java new file mode 100644 index 0000000..bcf556d --- /dev/null +++ b/src/main/java/com/lyh/handle/HandleCache.java @@ -0,0 +1,343 @@ +package com.lyh.handle; + +import com.aliyun.ecs20140526.models.DescribeInstancesResponseBody; +import com.lyh.common.aliyun.service.AliYunEcsService; +import com.lyh.common.utils.mqtt.MqttUtil; +import com.lyh.common.utils.user.UserUtil; +import com.lyh.gateway.cache.*; +import com.lyh.gateway.mode.GatewayNodeInfo; +import com.lyh.gateway.mode.IpAndLoadCount; +import com.lyh.gateway.mode.IpAndWeight; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; + +/** + * @ProjectName: LoadCenter + * @Author: LiuYunHu + * @CreateTime: 2024/4/19 + * @Description: 操作缓存类 + */ +@Component +@Slf4j +public class HandleCache { + @Autowired + private MqttUtil mqttUtil; + + @Autowired + private AliYunEcsService aliYunEcsService; + + /* + * 操作网关节点缓存 + * */ + @Autowired + private GatewayNodeInfoCache gatewayNodeInfoCache; + + /* + * 操作负载节点IP缓存 + * */ + @Autowired + private GatewayNodeIpCache gatewayNodeIpCache; + + /* + * 操作负载节点ID缓存 + * */ + @Autowired + private GatewayNodeIdCache gatewayNodeIdCache; + + /* + * 操作实例IP和负载量缓存 + * */ + @Autowired + private GatewayIpAndLoadCountCache gatewayIpAndLoadCountCache; + + /* + * 操作实例IP和权重缓存 + * */ + @Autowired + private GatewayIpAndLoadWeightCache gatewayIpAndLoadWeightCache; + + /* + * 操作实例IP序列缓存 + * */ + @Autowired + private GatewayNodeOrderCache gatewayNodeOrderCache; + + //刷新所有的缓存 + @Scheduled(cron = "0/2 * * * * ?") + public void refreshAllCache() { + this.getNodeIDList(); + this.getNodeInfos(); + this.getNodeIPList(); + this.getIpAndLoadCounts(); + this.getIpAndWeights(); + this.getLoadNodeOrderListByIpAndWeights(); +} + + + /* + * @Description: 获取节点id列表 + * @Date: 2024/4/19 9:09 + * @Param: [] + * @Return: void + **/ + @PostConstruct + public void getNodeIDList() { + //获取上海区的实例ID列表 + try { + List ecsIDList = aliYunEcsService.getIDList(); + if (ecsIDList == null || ecsIDList.isEmpty()) { + log.error("实例ID为空,开始创建第一个节点"); + aliYunEcsService.createAndRunInstance(); + } + + gatewayNodeIdCache.put(ecsIDList); + } catch (Exception e) { + log.error("获取节点ID失败:{}", e.getMessage()); + } + } + + /* + * @Description: 获取实例信息列表 + * @Date: 2024/4/18 21:18 + * @Param: [] + * @Return: void + **/ + + public void getNodeInfos() { + int count = 0; + + //新建List,用于redis存储实例信息 + ArrayList gatewayNodeInfos = new ArrayList<>(); + + try { + //获取ID列表 + List ecsIDList = gatewayNodeIdCache.get(); + if (ecsIDList == null || ecsIDList.isEmpty()) { + log.error("实例ID为空"); + return; + } + + //将ID进行拼接,用逗号分隔 + String ids = ""; + for (String id : ecsIDList) { + ids += id + ","; + } + ids = ids.substring(0, ids.length() - 1); + + //查询所有ID实例的详细信息 + List response = aliYunEcsService.queryInstancesInformation(ids); + + + for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance item : response) { + log.info("查询第{" + count + "}个实例的ID:" + item.getInstanceId()); + log.info("名称:" + item.getInstanceName()); + log.info("地域ID:" + item.getRegionId()); + log.info("状态:" + item.getStatus()); + log.info("类型:" + item.getInstanceType()); + log.info("CPU核心数:" + item.getCpu()); + log.info("内存大小:" + item.getMemory() + "MB"); + log.info("磁盘大小:" + item.getLocalStorageCapacity() + "G"); + log.info("操作系统:" + item.getOSName()); + log.info("网络类型:" + item.getInstanceNetworkType()); + log.info("公网出带宽值:" + item.getInternetMaxBandwidthOut() + "Mbit/s"); + log.info("公网入带宽值:" + item.getInternetMaxBandwidthIn() + "Mbit/s"); + log.info("公网IP:" + UserUtil.removeBrackets(item.getPublicIpAddress().getIpAddress().toString())); + log.info("私网IP:" + UserUtil.removeBrackets(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString())); + log.info("专有网络VPCID:" + item.getVpcAttributes().getVpcId()); + log.info("安全组ID:" + UserUtil.removeBrackets(item.getSecurityGroupIds().getSecurityGroupId().toString())); + log.info("创建时间:" + item.getCreationTime()); + log.info("到期时间:" + item.getExpiredTime()); + log.info("是否可以回收:" + (item.getRecyclable() ? "是" : "否") + "\n\n"); + + //存入集合 + gatewayNodeInfos.add( + new GatewayNodeInfo( + item.getInstanceId(), + item.getInstanceName(), + item.getStatus(), + UserUtil.removeBrackets(item.getPublicIpAddress().getIpAddress().toString()), + UserUtil.removeBrackets(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString()), + item.getCreationTime(), + item.getExpiredTime(), + item.getRecyclable() + ) + ); + + count++; + } + + + } catch (Exception e) { + throw new RuntimeException("获取实例列表失败:" + e.getMessage()); + } + log.info("实例信息列表:{}", gatewayNodeInfos); + + //存入缓存 + gatewayNodeInfoCache.put(gatewayNodeInfos); + + } + + + /* + * @Author: LiuYunHu + * @Date: 2024/4/17 17:14 + * @Description: 获取所有实例公网的IP列表 + * @Param: [] + * @Return: List + **/ + public void getNodeIPList() { + //存IP的List + ArrayList nodeIPList = new ArrayList<>(); + + //从缓存中拿到实例信息列表 + List gatewayNodeInfoList = gatewayNodeInfoCache.get(); + + if (gatewayNodeInfoList.isEmpty()) { + log.error("实例信息列表为空!"); + return; + } + + gatewayNodeInfoList.forEach(item -> { + //获取IP + String ip = item.getPublicIpAddress(); + //存入集合 + nodeIPList.add(ip); + }); + + + log.info("实例公网IP列表:{}", nodeIPList); + + //将IP列表存入redis + gatewayNodeIpCache.put(nodeIPList); + + } + + + /* + * @Author: LiuYunHu + * @Date: 2024/4/17 19:41 + * @Description: 通过IP获取各个IP的负载量 + * @Param: ecsIPList + * @Return: List + **/ + public void getIpAndLoadCounts() { + //从缓存中获取实例公网IP列表 + List nodeIPList = gatewayNodeIpCache.get(); + if (nodeIPList.isEmpty()) { + log.error("实例公网IP列表为空!"); + return; + } + + //存各个 服务器的负载量 + ArrayList ipAndLoadCounts = new ArrayList<>(); + + //拿到IP后,获取各个IP的负载量 + nodeIPList.forEach(ip -> { + ipAndLoadCounts.add(mqttUtil.getFetchLoad(ip)); + }); + + log.info("各个IP的负载量:{}", ipAndLoadCounts); + + gatewayIpAndLoadCountCache.put(ipAndLoadCounts); + } + + + /* + * @Author: LiuYunHu + * @Date: 2024/4/17 19:49 + * @Description: 通过IP和对应的负载量,计算出IP对应的权重 + * @Param: ipAndLoadCountList + * @Return: List + **/ + public void getIpAndWeights() { + //从缓存中获取公网IP和负载量列表 + List ipAndLoadCounts = gatewayIpAndLoadCountCache.get(); + if (ipAndLoadCounts.isEmpty()) { + log.error("负载量列表为空!"); + return; + } + + + //求出空负载的总量 + int emptyLoadCount = 0; + for (IpAndLoadCount ipAndLoadCount : ipAndLoadCounts) { + //假设使用2/8原则 一个节点最多能有100个连接 + emptyLoadCount += (80 - ipAndLoadCount.getLoadCount()); + } + + //存储IP和对应的权重 + ArrayList ipAndWeights = new ArrayList<>(); + for (IpAndLoadCount ipAndLoadCount : ipAndLoadCounts) { + + IpAndWeight ipAndWeight = new IpAndWeight( + ipAndLoadCount.getIp(), + (80 - ipAndLoadCount.getLoadCount()) * 100 / emptyLoadCount + ); + ipAndWeights.add(ipAndWeight); + } + + log.info("实例IP和对应的权重:{}", ipAndWeights);//[IpAndWeight(nodeIp=47.102.158.233, weight=55), IpAndWeight(nodeIp=47.102.123.209, weight=44)] + + gatewayIpAndLoadWeightCache.put(ipAndWeights); + } + + /* + * @Author: LiuYunHu + * @Date: 2024/4/17 20:02 + * @Description: 通过IP和权重,计算节点IP序列 + * @Param: [] + * @Return: + **/ + public void getLoadNodeOrderListByIpAndWeights() { + //从缓存中获取公网IP和权重列表 + List ipAndWeights = gatewayIpAndLoadWeightCache.get(); + if (ipAndWeights.isEmpty()) { + log.error("负载节点IP和权重列表为空!"); + return; + } + + ArrayList loadNodeList = new ArrayList<>(); + + int sum = ipAndWeights.stream() + .mapToInt(IpAndWeight::getWeight) + .sum(); + if (sum < 100) { + List list = ipAndWeights.stream().sorted(((o1, o2) -> o2.getWeight() - o1.getWeight())).toList(); + + //给权重高的节点 权重再加一个 + int countWeight = 0; + for (int i = sum; i < 100; i++) { + IpAndWeight ipAndWeight = list.get(countWeight++ % list.size()); + ipAndWeight.setWeight(ipAndWeight.getWeight() + 1); + } + } + + whFor: + while (true) { + for (IpAndWeight ipAndWeight : ipAndWeights) { + Integer weight = ipAndWeight.getWeight(); + if (weight > 0) { + loadNodeList.add(ipAndWeight.getNodeIp()); + } + ipAndWeight.setWeight(weight - 1); + } + + int sum1 = ipAndWeights.stream() + .mapToInt(IpAndWeight::getWeight) + .sum(); + if (sum1 <= 0) { + break whFor; + } + } + + log.info("负载节点的IP序列列表:{}", loadNodeList); + //节点IP序列存入缓存 + gatewayNodeOrderCache.put(loadNodeList); + } +} diff --git a/src/main/java/com/lyh/job/Timer.java b/src/main/java/com/lyh/job/Timer.java index 2e9ce6b..88bf63a 100644 --- a/src/main/java/com/lyh/job/Timer.java +++ b/src/main/java/com/lyh/job/Timer.java @@ -1,15 +1,16 @@ package com.lyh.job; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; import com.lyh.common.aliyun.service.AliYunEcsService; +import com.lyh.gateway.cache.GatewayIpAndLoadCountCache; +import com.lyh.gateway.cache.GatewayNodeIdCache; +import com.lyh.gateway.mode.IpAndLoadCount; import lombok.extern.slf4j.Slf4j; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.List; + /** * @ProjectName: LoadCenter * @Author: LiuYunHu @@ -26,60 +27,106 @@ public class Timer { @Autowired private AliYunEcsService aliYunEcsService; + /* + * 操作缓存 + * */ + @Autowired + private GatewayIpAndLoadCountCache gatewayIpAndLoadCountCache; - // @Scheduled(cron = "0/10 * * * * ?") - public void test() { - String ip = "47.102.123.209"; - //请求路径 - String URL = "http://" + ip + ":8080/public/cluster"; - - OkHttpClient client = new OkHttpClient(); - - Request req = new Request.Builder() - .url(URL) - .get() - .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)") - .addHeader("Accesstoken", "") - .build(); - - try { - Response response = client.newCall(req).execute(); - - log.info(String.valueOf(response)); + @Autowired + private GatewayNodeIdCache gatewayNodeIdCache; - JSONArray jsonArray = null; - if (null != response.body()) { - jsonArray = JSONArray.parseArray(response.body().string()); + @Scheduled(cron = "0/5 * * * * ?") + /* + * @Description: 动态扩容 + * @Date: 2024/4/19 9:44 + * @Param: [] + * @Return: void + **/ + public void dynamicExpansion() { + //先获取所有的负载列表 + List ipAndLoadCounts = gatewayIpAndLoadCountCache.get(); - JSONObject jsonObject = jsonArray.getJSONObject(0); - //获取mqttInfo对象的值 - JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo"); - //获取连接数 - int connectSize = mqttInfo.getIntValue("connectSize"); + //计算所有节点的负载 + int connectSize = ipAndLoadCounts.stream().mapToInt(IpAndLoadCount::getLoadCount).sum(); - log.info(ip + " 的fluxmq连接数为:" + connectSize); + //求出平均值 + int avg = connectSize / ipAndLoadCounts.size(); + if (avg >= 80) { + //执行节点扩容 - if (connectSize >= 80) { - //执行节点扩容 - - //返回实例的ID - String instanceId = aliYunEcsService.createAndRunInstance(); - - if (!instanceId.isEmpty()) { - log.info("扩容 成功!"); - log.info("扩容的节点ip为:" + instanceId); - } - } + //返回实例的ID + String instanceId = null; + try { + instanceId = aliYunEcsService.createAndRunInstance(); + } catch (Exception e) { + throw new RuntimeException("节点扩容失败!" + e.getMessage()); } - } catch (Exception e) { - log.error(e.getMessage()); + if (!instanceId.isEmpty()) { + log.info("扩容 成功!"); + log.info("扩容的节点ip为:" + instanceId); + } + } else { + log.info("暂时不需要扩容"); + } + } + + +// @Scheduled(cron = "0/5 * * * * ?") + /* TODO 缩容有问题 + * @Description: 动态缩容 + * @Date: 2024/4/19 9:44 + * @Param: [] + * @Return: void + **/ + public void dynamicReduction() { + //求出所有的节点ID + List nodeIds = gatewayNodeIdCache.get(); + + if (nodeIds.size() <= 1) { + log.error("暂无节点可删除!"); + return; } + //先获取所有的负载列表 + List ipAndLoadCounts = gatewayIpAndLoadCountCache.get(); + if (ipAndLoadCounts.size() <= 1) { + log.error("负载列表为空!"); + return; + } + + //计算所有节点的负载 + int connectSize = ipAndLoadCounts.stream().mapToInt(IpAndLoadCount::getLoadCount).sum(); + + //求出平均值 + int avg = connectSize / ipAndLoadCounts.size(); + + if (avg <= 30) { + String request = ""; + + for (String nodeId : nodeIds) { + request = nodeId + ","; + } + + request = request.substring(0, request.length() - 1); + + //执行节点缩容 + try { + aliYunEcsService.releaseInstances(request); + } catch (Exception e) { + throw new RuntimeException("节点缩容失败!" + e.getMessage()); + } + } else { + log.info("暂时不需要缩容"); + } } } + + + diff --git a/src/main/java/com/lyh/service/impl/LoadCenterServiceImpl.java b/src/main/java/com/lyh/service/impl/LoadCenterServiceImpl.java index 6391b9d..18662be 100644 --- a/src/main/java/com/lyh/service/impl/LoadCenterServiceImpl.java +++ b/src/main/java/com/lyh/service/impl/LoadCenterServiceImpl.java @@ -1,23 +1,13 @@ package com.lyh.service.impl; -import com.aliyun.ecs20140526.models.DescribeInstancesResponseBody; -import com.lyh.common.aliyun.service.AliYunEcsService; import com.lyh.common.domain.resp.Result; -import com.lyh.common.redis.service.RedisService; -import com.lyh.common.utils.mqtt.MqttUtil; -import com.lyh.common.utils.user.UserUtil; -import com.lyh.gateway.cache.*; -import com.lyh.gateway.mode.GatewayNodeInfo; -import com.lyh.gateway.mode.IpAndLoadCount; -import com.lyh.gateway.mode.IpAndWeight; +import com.lyh.gateway.cache.GatewayNodeOrderCache; +import com.lyh.handle.HandleCache; import com.lyh.service.LoadCenterService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import java.util.ArrayList; import java.util.List; /** @@ -29,38 +19,12 @@ import java.util.List; @Service @Slf4j public class LoadCenterServiceImpl implements LoadCenterService { - @Autowired - private MqttUtil mqttUtil; - - @Autowired - private RedisService redis; - - @Autowired - private AliYunEcsService aliYunEcsService; - /* - * 操作网关节点缓存 + * 操作缓存 * */ @Autowired - private GatewayNodeInfoCache gatewayNodeInfoCache; + private HandleCache handleCache; - /* - * 操作负载节点IP缓存 - * */ - @Autowired - private GatewayLoadNodeIpCache gatewayLoadNodeIpCache; - - /* - * 操作实例IP和负载量缓存 - * */ - @Autowired - private GatewayIpAndLoadCountCache gatewayIpAndLoadCountCache; - - /* - * 操作实例IP和权重缓存 - * */ - @Autowired - private GatewayIpAndLoadWeightCache gatewayIpAndLoadWeightCache; /* * 操作实例IP序列缓存 @@ -77,18 +41,8 @@ public class LoadCenterServiceImpl implements LoadCenterService { **/ @Override public Result getAssignedServer() { - //获取所有实例公网的IP列表 - this.getNodeIPList(); - - - //通过IP列表 获取各个IP对应的负载量 - this.getIpAndLoadCounts(); - - //通过IP和对应的负载量,计算出IP对应的权重 - this.getIpAndWeights(); - - //通过IP和权重,计算负载节点的IP序列列表 - this.getLoadNodeOrderListByIpAndWeights(); + //得到负载节点的IP序列列表 + handleCache.getLoadNodeOrderListByIpAndWeights(); //获取序列缓存里最后一个IP进行返回 @@ -100,238 +54,6 @@ public class LoadCenterServiceImpl implements LoadCenterService { } - /* - * @Description: 获取实例信息列表 - * @Date: 2024/4/18 21:18 - * @Param: [] - * @Return: void - **/ - @PostConstruct - @Scheduled(cron = "0/5 * * * * ?") - public void getNodeInfos() { - int count = 0; - - //新建List,用于redis存储实例信息 - ArrayList gatewayNodeInfos = new ArrayList<>(); - - try { - //获取上海区的实例ID列表 - List ecsIDList = aliYunEcsService.getIDList(); - - //将ID进行拼接,用逗号分隔 - String ids = ""; - for (String id : ecsIDList) { - ids += id + ","; - } - ids = ids.substring(0, ids.length() - 1); - - //查询所有ID实例的详细信息 - List response = aliYunEcsService.queryInstancesInformation(ids); - - - for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance item : response) { - log.info("查询第{" + count + "}个实例的ID:" + item.getInstanceId()); - log.info("名称:" + item.getInstanceName()); - log.info("地域ID:" + item.getRegionId()); - log.info("状态:" + item.getStatus()); - log.info("类型:" + item.getInstanceType()); - log.info("CPU核心数:" + item.getCpu()); - log.info("内存大小:" + item.getMemory() + "MB"); - log.info("磁盘大小:" + item.getLocalStorageCapacity() + "G"); - log.info("操作系统:" + item.getOSName()); - log.info("网络类型:" + item.getInstanceNetworkType()); - log.info("公网出带宽值:" + item.getInternetMaxBandwidthOut() + "Mbit/s"); - log.info("公网入带宽值:" + item.getInternetMaxBandwidthIn() + "Mbit/s"); - log.info("公网IP:" + UserUtil.removeBrackets(item.getPublicIpAddress().getIpAddress().toString())); - log.info("私网IP:" + UserUtil.removeBrackets(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString())); - log.info("专有网络VPCID:" + item.getVpcAttributes().getVpcId()); - log.info("安全组ID:" + UserUtil.removeBrackets(item.getSecurityGroupIds().getSecurityGroupId().toString())); - log.info("创建时间:" + item.getCreationTime()); - log.info("到期时间:" + item.getExpiredTime()); - log.info("是否可以回收:" + (item.getRecyclable() ? "是" : "否") + "\n\n"); - - //存入集合 - gatewayNodeInfos.add( - new GatewayNodeInfo( - item.getInstanceId(), - item.getInstanceName(), - item.getStatus(), - UserUtil.removeBrackets(item.getPublicIpAddress().getIpAddress().toString()), - UserUtil.removeBrackets(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString()), - item.getCreationTime(), - item.getExpiredTime(), - item.getRecyclable() - ) - ); - - count++; - } - - - } catch (Exception e) { - throw new RuntimeException("获取实例列表失败:" + e.getMessage()); - } - log.info("实例信息列表:{}", gatewayNodeInfos); - - //存入缓存 - gatewayNodeInfoCache.put(gatewayNodeInfos); - - } - - - /* - * @Author: LiuYunHu - * @Date: 2024/4/17 17:14 - * @Description: 获取所有实例公网的IP列表 - * @Param: [] - * @Return: List - **/ - public void getNodeIPList() { - //存IP的List - ArrayList nodeIPList = new ArrayList<>(); - - //从缓存中拿到实例信息列表 - List gatewayNodeInfoList = gatewayNodeInfoCache.get(); - - if (gatewayNodeInfoList.isEmpty()) { - throw new RuntimeException("实例信息列表为空!"); - } - - gatewayNodeInfoList.forEach(item -> { - //获取IP - String ip = item.getPublicIpAddress(); - //存入集合 - nodeIPList.add(ip); - }); - - - log.info("实例公网IP列表:{}", nodeIPList); - - //将IP列表存入redis - gatewayLoadNodeIpCache.put(nodeIPList); - - } - - - /* - * @Author: LiuYunHu - * @Date: 2024/4/17 19:41 - * @Description: 通过IP获取各个IP的负载量 - * @Param: ecsIPList - * @Return: List - **/ - public void getIpAndLoadCounts() { - //从缓存中获取实例公网IP列表 - List nodeIPList = gatewayLoadNodeIpCache.get(); - if (nodeIPList.isEmpty()) { - throw new RuntimeException("实例公网IP列表为空!"); - } - - //存各个 服务器的负载量 - ArrayList ipAndLoadCounts = new ArrayList<>(); - - //拿到IP后,获取各个IP的负载量 - nodeIPList.forEach(ip -> { - int fetchLoad = mqttUtil.getFetchLoad(ip); - ipAndLoadCounts.add(new IpAndLoadCount(ip, fetchLoad)); - }); - - log.info("各个IP的负载量:{}", ipAndLoadCounts); - - gatewayIpAndLoadCountCache.put(ipAndLoadCounts); - } - - - /* - * @Author: LiuYunHu - * @Date: 2024/4/17 19:49 - * @Description: 通过IP和对应的负载量,计算出IP对应的权重 - * @Param: ipAndLoadCountList - * @Return: List - **/ - public void getIpAndWeights() { - //从缓存中获取公网IP和负载量列表 - List ipAndLoadCounts = gatewayIpAndLoadCountCache.get(); - if (ipAndLoadCounts.isEmpty()) { - throw new RuntimeException("负载量列表为空!"); - } - - - //求出空负载的总量 - int emptyLoadCount = 0; - for (IpAndLoadCount ipAndLoadCount : ipAndLoadCounts) { - //假设使用2/8原则 一个节点最多能有100个连接 - emptyLoadCount += (80 - ipAndLoadCount.getLoadCount()); - } - - //存储IP和对应的权重 - ArrayList ipAndWeights = new ArrayList<>(); - for (IpAndLoadCount ipAndLoadCount : ipAndLoadCounts) { - - IpAndWeight ipAndWeight = new IpAndWeight( - ipAndLoadCount.getIp(), - (80 - ipAndLoadCount.getLoadCount()) * 100 / emptyLoadCount - ); - ipAndWeights.add(ipAndWeight); - } - - log.info("实例IP和对应的权重:{}", ipAndWeights);//[IpAndWeight(nodeIp=47.102.158.233, weight=55), IpAndWeight(nodeIp=47.102.123.209, weight=44)] - - gatewayIpAndLoadWeightCache.put(ipAndWeights); - } - - /* - * @Author: LiuYunHu - * @Date: 2024/4/17 20:02 - * @Description: 通过IP和权重,计算节点IP序列 - * @Param: [] - * @Return: - **/ - public void getLoadNodeOrderListByIpAndWeights() { - //从缓存中获取公网IP和权重列表 - List ipAndWeights = gatewayIpAndLoadWeightCache.get(); - if (ipAndWeights.isEmpty()) { - throw new RuntimeException("负载节点IP和权重列表为空!"); - } - - ArrayList loadNodeList = new ArrayList<>(); - - int sum = ipAndWeights.stream() - .mapToInt(IpAndWeight::getWeight) - .sum(); - if (sum < 100) { - List list = ipAndWeights.stream().sorted(((o1, o2) -> o2.getWeight() - o1.getWeight())).toList(); - - //给权重高的节点 权重再加一个 - int countWeight = 0; - for (int i = sum; i < 100; i++) { - IpAndWeight ipAndWeight = list.get(countWeight++ % list.size()); - ipAndWeight.setWeight(ipAndWeight.getWeight() + 1); - } - } - - whFor: - while (true) { - for (IpAndWeight ipAndWeight : ipAndWeights) { - Integer weight = ipAndWeight.getWeight(); - if (weight > 0) { - loadNodeList.add(ipAndWeight.getNodeIp()); - } - ipAndWeight.setWeight(weight - 1); - } - - int sum1 = ipAndWeights.stream() - .mapToInt(IpAndWeight::getWeight) - .sum(); - if (sum1 <= 0) { - break whFor; - } - } - - log.info("负载节点的IP序列列表:{}", loadNodeList); - //节点IP序列存入缓存 - gatewayNodeOrderCache.put(loadNodeList); - } } diff --git a/src/test/java/Test.java b/src/test/java/Test.java index 48846e3..b042459 100644 --- a/src/test/java/Test.java +++ b/src/test/java/Test.java @@ -38,7 +38,7 @@ public class Test { // // Thread.sleep(2000); - String instanceId = "i-uf6if4mw6iu6rjffrs2c,i-uf6a4lwh3qdqwa5t5237"; + String instanceId = "i-uf6cizeumna7tpv1b31c,i-uf60oj9rvtykjdobsjqh"; //通过实例ID获取实例的详细属性 多个实例用英文逗号隔开 List describeInstancesResponseBodyInstancesInstances = aliYunEcsService.queryInstancesInformation(instanceId); @@ -72,7 +72,7 @@ public class Test { **/ @org.junit.jupiter.api.Test public void releaseInstances() throws Exception { - aliYunEcsService.releaseInstances("i-uf662hanv9a05kyvooo5"); + aliYunEcsService.releaseInstances("i-uf62wv013unz2ee2tsl6"); } /* @@ -96,6 +96,10 @@ public class Test { List ecsIDList = null; try { ecsIDList = aliYunEcsService.getIDList(); + if (ecsIDList.isEmpty()) { + log.info("没有找到实例"); + return; + } } catch (Exception e) { throw new RuntimeException(e); }