diff --git a/pom.xml b/pom.xml index f4e3afb..96e6623 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,14 @@ fastjson2 2.0.47 + + + org.redisson + redisson + 3.15.5 + + + org.projectlombok lombok diff --git a/src/main/java/com/guo/aly/ALYunEcsService.java b/src/main/java/com/guo/aly/ALYunEcsService.java index cfe86da..4957373 100644 --- a/src/main/java/com/guo/aly/ALYunEcsService.java +++ b/src/main/java/com/guo/aly/ALYunEcsService.java @@ -17,6 +17,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.stream.Collectors; /** * @author gxb @@ -42,19 +43,27 @@ public class ALYunEcsService { * @return 返回实例集合信息 */ public List selectEscList(EcsSelectModel ecsSelectModel){ - DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() + log.info("当前对象的参数值:" + ecsSelectModel.toString()); + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() .setRegionId(aliConfig.getRegionId()); - if (ecsSelectModel.getInstanceNameList() == null || (ecsSelectModel.getInstanceIdList() != null && ecsSelectModel.getInstanceIdList().isEmpty())) { + + //判断实例名称 == null 或 实例名称 == 空 + if (ecsSelectModel.getInstanceNameList().toString() != null) { + //不为空 + describeInstancesRequest.setInstanceName(Common.toJSONString(ecsSelectModel.getInstanceNameList())); + + } else { + // 为空 加 * 号 describeInstancesRequest.setInstanceName("*"); - } else { - describeInstancesRequest.setInstanceName(Common.toJSONString(ecsSelectModel.getInstanceNameList())); - } - if (ecsSelectModel.getInstanceIdList() != null && !ecsSelectModel.getInstanceIdList().isEmpty()) { - describeInstancesRequest.setInstanceIds(Common.toJSONString(ecsSelectModel.getInstanceIdList()).toString()); - } else { - describeInstancesRequest.setInstanceName(Common.toJSONString(ecsSelectModel.getInstanceNameList())); } + + if (ecsSelectModel.getInstanceIdList() != null) { + describeInstancesRequest.setInstanceIds(Common.toJSONString(ecsSelectModel.getInstanceIdList()).toString()); + } + //else { +// describeInstancesRequest.setInstanceName(Common.toJSONString(ecsSelectModel.getInstanceNameList())); +// } describeInstancesRequest.setPageSize(10); RuntimeOptions runtime = new RuntimeOptions(); @@ -92,14 +101,18 @@ public class ALYunEcsService { } return new ArrayList<>(); + } + + + /** * 创建实例方法 * @throws Exception */ - public String createAnServers() throws Exception { + public String createAnServer(String isNull) throws Exception { // 地域Id //String regionId = "cn-shanghai"; String regionId = aliConfig.getRegionId(); @@ -135,13 +148,18 @@ public class ALYunEcsService { // PostPaid:按量付费 //String instanceChargeType = "PostPaid"; String instanceChargeType = aliConfig.getInstanceChargeType(); + // 创建 【1台】 实例 + if (isNull == null){ + String instances = RunInstance(client, regionId, imageId, instanceType, securityGroupId, vSwitchId, internetMaxBandwidthOut, internetChargeType, size, category, instanceChargeType); + return instances; //返回实例ID + } // 批量创建实例 String instances = RunInstances(client, regionId, imageId, instanceType, securityGroupId, vSwitchId, internetMaxBandwidthOut, internetChargeType, size, category, instanceChargeType); - //返回实例ID + //返回实例ID return instances; - } + /** * 批量创建【2】台服务器 * RunInstances 通过备选实例规格创建ECS实例最佳实践 @@ -184,52 +202,6 @@ public class ALYunEcsService { return JSON.toJSONString(responces.body.instanceIdSets.instanceIdSet); } - /** - * 创建实例方法 【1台】 - * @param - * @throws Exception - */ - public String createAnServer() throws Exception { - // 地域Id - //String regionId = "cn-shanghai"; - String regionId = aliConfig.getRegionId(); - // 镜像 ID,启动实例时选择的镜像资源。 - // String imageId = "m-uf6elrscl3c9wk6o762l"; - String imageId = aliConfig.getImageId(); - // 实例规格 - //String instanceType = "ecs.u1-c1m1.large"; - String instanceType = aliConfig.getInstanceType(); - // 新创建实例所属于的安全组 ID。 - //String securityGroupId = "sg-uf6bj6vxp8ruhvffdsau"; - String securityGroupId = aliConfig.getSecurityGroupId(); - // 虚拟交换机 ID。 - //String vSwitchId = "vsw-uf66jtgij0ptqxf1ix6l7 "; - String vSwitchId = aliConfig.getVSwitchId(); - // 公网出带宽最大值,单位为 Mbit/s。取值范围:0~100。 默认值:0。 - //Integer internetMaxBandwidthOut = Integer.parseInt("2"); - Integer internetMaxBandwidthOut = Integer.parseInt(aliConfig.getInternetMaxBandwidthOut()); - // 网络计费类型。取值范围: - // PayByBandwidth: 按固定带宽计费。 - // PayByTraffic: 按使用流量计费。 - // 默认值:PayByTraffic。 - //String internetChargeType = "PayByTraffic"; - String internetChargeType = aliConfig.getInternetChargeType(); - // 系统盘大小 - //String size = "20"; - String size = aliConfig.getSize(); - // 系统盘的云盘种类 - //String category = "cloud_essd"; - String category = aliConfig.getCategory(); - // ECS实例的计费方式 - // PrePaid:包年包月 - // PostPaid:按量付费 - //String instanceChargeType = "PostPaid"; - String instanceChargeType = aliConfig.getInstanceChargeType(); - // 批量创建实例 - String instances = RunInstance(client, regionId, imageId, instanceType, securityGroupId, vSwitchId, internetMaxBandwidthOut, internetChargeType, size, category, instanceChargeType); - return instances; - - } /** * 批量创建【1】台服务器 @@ -273,6 +245,7 @@ public class ALYunEcsService { return JSON.toJSONString(responces.body.instanceIdSets.instanceIdSet); } + public DescribeInstancesResponse DescribeInstances(Client client, String regionId, String instanceIds, String instanceName) throws Exception { DescribeInstancesRequest req = new DescribeInstancesRequest() .setRegionId(regionId) diff --git a/src/main/java/com/guo/common/constant/CacheConstants.java b/src/main/java/com/guo/common/constant/CacheConstants.java new file mode 100644 index 0000000..8d54cbf --- /dev/null +++ b/src/main/java/com/guo/common/constant/CacheConstants.java @@ -0,0 +1,49 @@ +package com.guo.common.constant; + +/** + * @author gxb + * @description 缓存信息常量 + * @date 2024-04-19 14:20 + */ +public class CacheConstants { + + /** + * 缓存有效时间 50分钟 + */ + public final static int EXPIRATIOTIME = 50; + + /** + * 通用节点缓存前缀 encode + */ + public final static String GATEWAY_COMMON = "gateway:load:"; + + /** + * 网关负载节点缓存 + */ + public final static String GATEWAY_LOAD_NODE_KEY = "node"; + + /** + * 网关负载序列 + */ + public final static String GATEWAY_LOAD_SERIES_KEY = "series"; + + /** + * 网关节点缓存前缀 + */ + public final static String GATE_WAY_NODE_INFO= "gateway:node:info:"; + + /** + * 网关节点连接数前缀 + */ + public final static String GATEWAY_NODE_SCORE_CACHE = "score"; + + /** + * 网关节点存储VIN信息 + */ + public final static String GATEWAY_VEHICLE= "gateway:vehicle:"; + + /** + * 网关车辆对应网关节点ID + */ + public final static String GATEWAY_VEHICLE_LINE= "gateway:vehicleLine:"; +} diff --git a/src/main/java/com/guo/common/constant/LoadConstants.java b/src/main/java/com/guo/common/constant/LoadConstants.java new file mode 100644 index 0000000..66a667e --- /dev/null +++ b/src/main/java/com/guo/common/constant/LoadConstants.java @@ -0,0 +1,40 @@ +package com.guo.common.constant; + +/** + * @author gxb + * @description 负载通用常量 + * @date 2024-04-19 14:23 + */ +public class LoadConstants { + + /** + * 负载的长度 + */ + public final static Long NODE_LENGTH = 100L; + + /** + * 每个节点最大连接数 + */ + public final static Long MAX_NUMBER = 100L; + + /** + * 创建节点 判断 + */ + public final static String IS_NULL = "isNotNull"; + + /** + * 节点 扩容百分比 60% 一台 + */ + public final static Long INTERMEDIATE = 60L; + + /** + * 节点 扩容百分比 80% 两台 + */ + public final static Long MAXIMUM = 80L; + + /** + * 通用数值 100 + */ + public final static int BE_COMMON = 100; + +} diff --git a/src/main/java/com/guo/gateway/cache/LoadNodeCache.java b/src/main/java/com/guo/gateway/cache/LoadNodeCache.java index a9aaf47..353c517 100644 --- a/src/main/java/com/guo/gateway/cache/LoadNodeCache.java +++ b/src/main/java/com/guo/gateway/cache/LoadNodeCache.java @@ -1,5 +1,6 @@ package com.guo.gateway.cache; +import com.guo.common.constant.CacheConstants; import com.guo.gateway.cache.abs.CacheAbs; import org.springframework.stereotype.Component; @@ -13,11 +14,10 @@ import java.util.List; @Component public class LoadNodeCache extends CacheAbs { - private final static String gatewayLoadNodeKey = "node"; @Override public String getPre() { - return "gateway:load:"; + return CacheConstants.GATEWAY_COMMON; } /** @@ -26,9 +26,9 @@ public class LoadNodeCache extends CacheAbs { */ public void put(List nodeList){ //删除key - redisService.deleteObject(encode(gatewayLoadNodeKey)); + redisService.deleteObject(encode(CacheConstants.GATEWAY_LOAD_NODE_KEY)); //存入节点权重集合 - redisService.setCacheList(encode(gatewayLoadNodeKey),nodeList); + redisService.setCacheList(encode(CacheConstants.GATEWAY_LOAD_NODE_KEY),nodeList); } /** @@ -36,7 +36,7 @@ public class LoadNodeCache extends CacheAbs { * @return 负载节点集合 */ public List get(){ - return redisService.getCacheList(encode(gatewayLoadNodeKey)); + return redisService.getCacheList(encode(CacheConstants.GATEWAY_LOAD_NODE_KEY)); } /** @@ -48,6 +48,6 @@ public class LoadNodeCache extends CacheAbs { if (index == null || index > 100){ throw new RuntimeException("下标违法:【0 - 100】"); } - return redisService.getCacheListValue(encode(gatewayLoadNodeKey),index); + return redisService.getCacheListValue(encode(CacheConstants.GATEWAY_LOAD_NODE_KEY),index); } } diff --git a/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java b/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java index 2d05335..4b5d873 100644 --- a/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java +++ b/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java @@ -1,5 +1,6 @@ package com.guo.gateway.cache; +import com.guo.common.constant.CacheConstants; import com.guo.gateway.cache.abs.CacheAbs; import org.springframework.stereotype.Component; @@ -13,11 +14,9 @@ import javax.annotation.PostConstruct; @Component public class LoadSeriesCache extends CacheAbs { - private final static String gatewayLoadSeriesKey = "series"; - @Override public String getPre() { - return "gateway:load:"; + return CacheConstants.GATEWAY_COMMON; } /** @@ -25,7 +24,7 @@ public class LoadSeriesCache extends CacheAbs { */ @PostConstruct public void init(){ - redisService.setCacheObject(encode(gatewayLoadSeriesKey),0); + redisService.setCacheObject(encode(CacheConstants.GATEWAY_LOAD_SERIES_KEY),0); } /** @@ -33,7 +32,7 @@ public class LoadSeriesCache extends CacheAbs { * @return 序列值 */ public Long get(){ - return redisService.getCacheObject(encode(gatewayLoadSeriesKey)); + return redisService.getCacheObject(encode(CacheConstants.GATEWAY_LOAD_SERIES_KEY)); } /** @@ -41,7 +40,7 @@ public class LoadSeriesCache extends CacheAbs { * @return 自增后的值 */ public Long incrementAndGet(){ - return redisService.increment(encode(gatewayLoadSeriesKey),1L); + return redisService.increment(encode(CacheConstants.GATEWAY_LOAD_SERIES_KEY),1L); } /** diff --git a/src/main/java/com/guo/gateway/cache/NodeCache.java b/src/main/java/com/guo/gateway/cache/NodeCache.java index cead84c..e8473bb 100644 --- a/src/main/java/com/guo/gateway/cache/NodeCache.java +++ b/src/main/java/com/guo/gateway/cache/NodeCache.java @@ -1,5 +1,6 @@ package com.guo.gateway.cache; +import com.guo.common.constant.CacheConstants; import com.guo.gateway.cache.abs.CacheAbs; import com.guo.gateway.model.NodeInfo; import org.springframework.stereotype.Component; @@ -14,7 +15,7 @@ public class NodeCache extends CacheAbs { @Override public String getPre() { - return "gateway:node:info:"; + return CacheConstants.GATE_WAY_NODE_INFO; } /** diff --git a/src/main/java/com/guo/gateway/cache/NodeReduced.java b/src/main/java/com/guo/gateway/cache/NodeReduced.java index 9cd0cdb..e7a162b 100644 --- a/src/main/java/com/guo/gateway/cache/NodeReduced.java +++ b/src/main/java/com/guo/gateway/cache/NodeReduced.java @@ -1,5 +1,6 @@ package com.guo.gateway.cache; +import com.guo.common.constant.CacheConstants; import com.guo.gateway.cache.abs.CacheAbs; import org.springframework.stereotype.Component; @@ -16,7 +17,7 @@ import java.util.concurrent.TimeUnit; @Component public class NodeReduced extends CacheAbs { - private final static int Expiratiotime = 50; + @Override public String getPre() { @@ -30,7 +31,7 @@ public class NodeReduced extends CacheAbs { public void put(String nodeId){ Set thresholeSet = new HashSet<>(); thresholeSet.add(nodeId); - redisService.setCacheSetEndTime(encode(nodeId),thresholeSet, TimeUnit.MINUTES,Expiratiotime); + redisService.setCacheSetEndTime(encode(nodeId),thresholeSet, TimeUnit.MINUTES, CacheConstants.EXPIRATIOTIME); } /** diff --git a/src/main/java/com/guo/gateway/cache/NodeScoreCache.java b/src/main/java/com/guo/gateway/cache/NodeScoreCache.java index c92716d..866d04d 100644 --- a/src/main/java/com/guo/gateway/cache/NodeScoreCache.java +++ b/src/main/java/com/guo/gateway/cache/NodeScoreCache.java @@ -1,10 +1,16 @@ package com.guo.gateway.cache; +import com.guo.common.constant.CacheConstants; import com.guo.gateway.cache.abs.CacheAbs; import com.guo.gateway.model.NodeJoin; +import com.guo.gateway.model.WorkGatewayNode; +import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * @author gxb @@ -14,18 +20,27 @@ import java.util.List; @Component public class NodeScoreCache extends CacheAbs { - private final static String gatewayNodeScoreCache = "score"; @Override public String getPre() { - return "gateway:join:"; + return CacheConstants.GATEWAY_COMMON; } + public List getNodeScore(){ + Set> range = redisService.redisTemplate.opsForZSet().rangeWithScores(encode(CacheConstants.GATEWAY_NODE_SCORE_CACHE), 0, -1); + return range.stream() + .map(ZSet -> WorkGatewayNode.builder().nodeId(ZSet.getValue()).weight(Integer.valueOf(String.valueOf(ZSet.getScore()))).build()) + .toList(); + + } + + + /** * 存入节点ID及连接数 * @param nodeJoin */ public void save(NodeJoin nodeJoin){ - redisService.setCacheSets(encode(gatewayNodeScoreCache),nodeJoin); + redisService.setCacheSets(encode(CacheConstants.GATEWAY_NODE_SCORE_CACHE),nodeJoin); } /** @@ -33,7 +48,7 @@ public class NodeScoreCache extends CacheAbs { * @return */ public List get(){ - return redisService.getCacheObject(encode(gatewayNodeScoreCache)); + return redisService.getCacheObject(encode(CacheConstants.GATEWAY_NODE_SCORE_CACHE)); } diff --git a/src/main/java/com/guo/gateway/cache/NodeSetVinCache.java b/src/main/java/com/guo/gateway/cache/NodeSetVinCache.java index 1b2a769..fd71f4c 100644 --- a/src/main/java/com/guo/gateway/cache/NodeSetVinCache.java +++ b/src/main/java/com/guo/gateway/cache/NodeSetVinCache.java @@ -1,5 +1,6 @@ package com.guo.gateway.cache; +import com.guo.common.constant.CacheConstants; import com.guo.gateway.cache.abs.CacheAbs; import com.guo.gateway.model.NodeVehicle; import org.springframework.stereotype.Component; @@ -14,7 +15,7 @@ public class NodeSetVinCache extends CacheAbs { @Override public String getPre() { - return "gateway:vehicle:"; + return CacheConstants.GATEWAY_VEHICLE; } /** diff --git a/src/main/java/com/guo/gateway/cache/VehicleLineNodeCache.java b/src/main/java/com/guo/gateway/cache/VehicleLineNodeCache.java index 7e01e3b..0525521 100644 --- a/src/main/java/com/guo/gateway/cache/VehicleLineNodeCache.java +++ b/src/main/java/com/guo/gateway/cache/VehicleLineNodeCache.java @@ -1,6 +1,7 @@ package com.guo.gateway.cache; import com.alibaba.fastjson2.JSON; +import com.guo.common.constant.CacheConstants; import com.guo.gateway.cache.abs.CacheAbs; import com.guo.gateway.model.NodeVehicle; import org.springframework.stereotype.Component; @@ -21,7 +22,7 @@ public class VehicleLineNodeCache extends CacheAbs { @Override public String getPre() { - return "gateway:vehicleLine:"; + return CacheConstants.GATEWAY_VEHICLE_LINE; } /** diff --git a/src/main/java/com/guo/gateway/model/WorkGatewayNode.java b/src/main/java/com/guo/gateway/model/WorkGatewayNode.java new file mode 100644 index 0000000..d24633a --- /dev/null +++ b/src/main/java/com/guo/gateway/model/WorkGatewayNode.java @@ -0,0 +1,26 @@ +package com.guo.gateway.model; + +import lombok.*; + +/** + * @author gxb + * @description 节点分配权重 + * @date 2024-04-19 14:25 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@ToString +@Builder +public class WorkGatewayNode { + + /** + * 节点ID + */ + private String nodeId; + + /** + * 权重值 + */ + private int weight; +} diff --git a/src/main/java/com/guo/service/impl/GateWayLoadService.java b/src/main/java/com/guo/service/impl/GateWayLoadService.java index 76ae778..97b1b55 100644 --- a/src/main/java/com/guo/service/impl/GateWayLoadService.java +++ b/src/main/java/com/guo/service/impl/GateWayLoadService.java @@ -13,4 +13,9 @@ public interface GateWayLoadService { */ String loadNode(); + /** + * 刷新负载 + */ + void refreshLoad(); + } diff --git a/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java b/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java index 194bd50..5965259 100644 --- a/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java +++ b/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java @@ -1,14 +1,19 @@ package com.guo.service.impl; import com.alibaba.fastjson2.JSONObject; +import com.guo.common.constant.LoadConstants; import com.guo.gateway.cache.*; import com.guo.gateway.model.NodeInfo; import com.guo.gateway.model.NodeJoin; +import com.guo.gateway.model.WorkGatewayNode; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import lombok.extern.log4j.Log4j2; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -16,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -28,11 +34,6 @@ import java.util.concurrent.atomic.AtomicInteger; @Log4j2 public class GateWayLoadServicelmpl implements GateWayLoadService { - /** - * 负载的长度 - */ - private final Long nodeLength = 100L; - /** * 网关负载节点缓存 */ @@ -63,6 +64,11 @@ public class GateWayLoadServicelmpl implements GateWayLoadService { */ private final VehicleLineNodeCache vehicleLineNodeCache; + /** + * 分布式锁 + */ + //private final RedissonClient redissonClient; + /** * 获取负载节点 @@ -71,109 +77,121 @@ public class GateWayLoadServicelmpl implements GateWayLoadService { */ @Override public String loadNode() { - //初始化序列 - loadSeriesCache.reset(); - - //new一个WorkGatewayNode类的集合 - List nodeIdList = new ArrayList<>(); - - //获取缓存内节点信息及连接数 - List LinkingValue = nodeScoreCache.get(); - - //遍历 - if (!LinkingValue.isEmpty()) { - for (String nodejoin : LinkingValue) { - //转型 - NodeJoin nodeJoin = JSONObject.parseObject(nodejoin, NodeJoin.class); - - nodeIdList.add(new WorkGatewayNode(nodeJoin.getNodeId(), nodeJoin.getLinkingNumber().intValue())); - } - } - - List loadNodeList = new ArrayList<>(); - - int count = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum(); - - if (count < 100) { - List list = nodeIdList.stream().sorted((o1, o2) -> o2.getWeight() - o1.getWeight()).toList(); - - int countWeight = 0; - for (long i = count; i < 100; i++) { - WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); - workGatewayNode.setWeight(workGatewayNode.getWeight() + 1); - } - } - - whFor: - while (true) { - for (WorkGatewayNode workGatewayNode : nodeIdList) { - int weight = workGatewayNode.getWeight(); - if (weight > 0) { - loadNodeList.add( - workGatewayNode.getNodeId() - ); - workGatewayNode.setWeight(weight - 1); - - } - - } - int sum = nodeIdList.stream(). - mapToInt(WorkGatewayNode::getWeight).sum(); - if (sum <= 0) { - break whFor; - } - } - //打印负载节点信息 - log.info(loadNodeList); - //存负载集合 - loadNodeCache.put(loadNodeList); - + //去刷新负载 + refreshLoad(); //获取自增序列值 Long seriesLoad = loadSeriesCache.incrementAndGet(); //获取自增序列值 - Long seriesLoadIndex = seriesLoad % nodeLength; + Long seriesLoadIndex = seriesLoad % LoadConstants.NODE_LENGTH; //获取负载下标 String loadNodeId = loadNodeCache.getFindByIndex(seriesLoadIndex); //通过获取节点ID NodeInfo nodeInfo = nodeCache.get(loadNodeId); - //获取缓存内节点的公网/内网信息 + //获取缓存内节点的公网/内网信息 返回公网IP return nodeInfo.getPublicIdAddress(); - //返回公网IP } + + /** + * 刷新负载 实现动态负载 + */ + @Override + public void refreshLoad() { + + //分布式锁 + // RLock refreshLoadLock = redissonClient.getLock("refreshLoadLock"); + // try { + // 尝试获取锁,最多等待10秒,持有锁60秒后自动释放 + // if (refreshLoadLock.tryLock(10, 60, TimeUnit.SECONDS)) { + // 在锁内执行刷新负载的逻辑 + List workGatewayNodes = nodeScoreCache.getNodeScore(); + + //车辆上线总数量 + long vehicleMaxOnlineNUm = workGatewayNodes.size() * 80; + + //目前连接数 + long veicleOnlineNowNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getWeight).sum())); + + //空余连接数 + long vehicleOnlineNum = vehicleMaxOnlineNUm - veicleOnlineNowNum; + + //转换 + List workGatewayNodeWeight = workGatewayNodes.stream() + .map(workGatewayNode -> WorkGatewayNode.builder() + .nodeId(workGatewayNode.getNodeId()) + .weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (80L - workGatewayNode.getWeight())))) + .build()) + .toList(); + + List loadNodeList = new ArrayList<>(); + + int count = workGatewayNodeWeight.stream().mapToInt(WorkGatewayNode::getWeight).sum(); + + if (count < 100) { + List list = workGatewayNodeWeight.stream().sorted((o1, o2) -> o2.getWeight() - o1.getWeight()).toList(); + + int countWeight = 0; + for (long i = count; i < 100; i++) { + WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); + workGatewayNode.setWeight(workGatewayNode.getWeight() + 1); + } + } + + whFor: + while (true) { + for (WorkGatewayNode workGatewayNode : workGatewayNodeWeight) { + int weight = workGatewayNode.getWeight(); + if (weight > 0) { + loadNodeList.add( + workGatewayNode.getNodeId() + ); + workGatewayNode.setWeight(weight - 1); + + } + + } + int sum = workGatewayNodeWeight.stream(). + mapToInt(WorkGatewayNode::getWeight).sum(); + if (sum <= 0) { + break whFor; + } + } + //重置 + loadSeriesCache.reset(); + //存入负载集合 + loadNodeCache.put(loadNodeList); + } + //} catch (InterruptedException e) { + // Thread.currentThread().interrupt(); + // 处理中断异常 + //} finally { + // 释放锁 + // if (refreshLoadLock.isHeldByCurrentThread()) { + // refreshLoadLock.unlock(); +//} + + } /** * 统计 */ -class StiNode { - // 使用 ConcurrentHashMap 保证线程安全 - private static final Map stiNodeMap = new ConcurrentHashMap<>(); - - public static void sti(String nodeId) { - // 使用 computeIfAbsent 方法确保原子性操作 - stiNodeMap.computeIfAbsent(nodeId, key -> new AtomicInteger()).incrementAndGet(); - } - - public static Map show() { - Map resultMap = new HashMap<>(); - stiNodeMap.forEach((key, val) -> { - resultMap.put(key, (long) val.get()); - System.out.println(key + "▷◁▷◁▷◁▷◁▷◁▷◁▷▷◁▷◁▷◁☛☛" + val.get()); - }); - return resultMap; - } -} +//class StiNode { +// // 使用 ConcurrentHashMap 保证线程安全 +// private static final Map stiNodeMap = new ConcurrentHashMap<>(); +// +// public static void sti(String nodeId) { +// // 使用 computeIfAbsent 方法确保原子性操作 +// stiNodeMap.computeIfAbsent(nodeId, key -> new AtomicInteger()).incrementAndGet(); +// } +// +// public static Map show() { +// Map resultMap = new HashMap<>(); +// stiNodeMap.forEach((key, val) -> { +// resultMap.put(key, (long) val.get()); +// System.out.println(key + "--------------" + val.get()); +// }); +// return resultMap; +// } +//} -/** - * 节点ID 、 权重 - */ -@Data -@AllArgsConstructor -@NoArgsConstructor -@ToString -class WorkGatewayNode { - private String nodeId; - private int weight; - -} diff --git a/src/main/java/com/guo/task/Collection.java b/src/main/java/com/guo/task/Collection.java index e82e328..9f7ef8b 100644 --- a/src/main/java/com/guo/task/Collection.java +++ b/src/main/java/com/guo/task/Collection.java @@ -34,11 +34,6 @@ import java.util.List; @AllArgsConstructor public class Collection { - /** - * 负载的长度 - */ - private final Long nodeLength = 100L; - /** * 网关负载节点缓存 */ @@ -83,6 +78,9 @@ public class Collection { @Autowired private ALYunEcsService alYunEcsService; + /** + * 定时扫描节点信息 + */ @Scheduled(cron = "0/10 * * * * ?") public void scheduledEcsCompanding() { @@ -177,7 +175,5 @@ public class Collection { //调用扩容方法去判断是否需要扩缩容 contractionVolume.contractionVolume(totalNumber); - - } } diff --git a/src/main/java/com/guo/task/Contraction/ContractionVolume.java b/src/main/java/com/guo/task/Contraction/ContractionVolume.java index 29fd7a5..3fc05be 100644 --- a/src/main/java/com/guo/task/Contraction/ContractionVolume.java +++ b/src/main/java/com/guo/task/Contraction/ContractionVolume.java @@ -1,6 +1,7 @@ package com.guo.task.Contraction; import com.guo.aly.ALYunEcsService; +import com.guo.common.constant.LoadConstants; import com.guo.common.model.TotalNumber; import com.guo.gateway.cache.NodeReduced; import lombok.AllArgsConstructor; @@ -20,17 +21,6 @@ import java.util.concurrent.TimeUnit; @Log4j2 public class ContractionVolume { - /** - * 每个节点最大连接数 - */ - private final static Long Maxnumber = 100L; - - /** - * 60%与80%阈值 - */ - private final static Long Intermediate = 60L; - private final static Long Maximum = 80L; - @Autowired private ALYunEcsService alYunEcsService; @@ -58,6 +48,7 @@ public class ContractionVolume { //比较 剩余时间 小于5分钟 if (expire < fiveMinutesSeconds){ //数据迁移 释放节点 + } } //记录 @@ -73,11 +64,11 @@ public class ContractionVolume { public void contractionVolume(TotalNumber totalNumber){ //特殊情况 无节点 if (totalNumber.getNodeNumber() == 0L){ - //无则创建两台实力 + // 无 则创建两台实力 log.error("当前未存在节点信息"); try { //创建实例方法 【2台】 - alYunEcsService.createAnServers(); + // alYunEcsService.createAnServer(LoadConstants.IS_NULL); } catch (Exception e) { log.error("扩容失败!!!!!"); e.printStackTrace(); @@ -88,24 +79,24 @@ public class ContractionVolume { //调用计算 Long value = this.percentage(totalNumber); //判断达到60% - if (value >= Intermediate && value < Maximum){ + if (value >= LoadConstants.INTERMEDIATE && value < LoadConstants.MAXIMUM){ //当节点负载达到 60%时,调用扩容一台方法 log.info("Node 节点负载达到 :" + value + "%,达到扩容一台的条件☑"); try { //创建实例方法 【1台】 - alYunEcsService.createAnServer(); + alYunEcsService.createAnServer(null); } catch (Exception e) { log.error("扩容失败!!!!!"); e.printStackTrace(); } } //判断达到80% - if (value >= Maximum){ + if (value >= LoadConstants.MAXIMUM){ //当节点负载达到 80%时,调用扩容一台方法 - log.info("Node 节点负载达到 :" + value + "%,达到扩容一台的条件☑"); + log.info("Node 节点负载达到 :" + value + "%,达到扩容两台的条件☑"); try { //创建实例方法 【2台】 - alYunEcsService.createAnServers(); + alYunEcsService.createAnServer(LoadConstants.IS_NULL); } catch (Exception e) { log.error("扩容失败!!!!!"); e.printStackTrace(); @@ -121,13 +112,15 @@ public class ContractionVolume { //获取节点数量 Long nodeNumber = totalNumber.getNodeNumber(); //根据nodeNumber去获取最大节点数 默认 100 - Long sumNodeNumber = nodeNumber * Maxnumber; + Long sumNodeNumber = nodeNumber * LoadConstants.MAX_NUMBER; //获取节点连接总数 Long connectionTotal = totalNumber.getConnectionTotal(); + //计算空余连接数 + Long vacantNumber = sumNodeNumber - connectionTotal; //计算当前负载情况 - double loadPercentage = (double)connectionTotal / sumNodeNumber; + double loadPercentage = (double)vacantNumber / sumNodeNumber; //进行四舍五入取整 - long roundLoadPercentage = Math.round(loadPercentage) * 100; + long roundLoadPercentage = Math.round(loadPercentage) * LoadConstants.BE_COMMON; //返回百分比 return roundLoadPercentage; }