diff --git a/pom.xml b/pom.xml index 96e6623..eaaa777 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,13 @@ com.muyu LoadCenter 1.0-SNAPSHOT - + + + aliyundeveloper + Aliyun SDK + aliyunsdk@aliyun.com + + 17 17 diff --git a/src/main/java/com/guo/aly/ALYunEcsService.java b/src/main/java/com/guo/aly/ALYunEcsService.java index dce4510..f9831da 100644 --- a/src/main/java/com/guo/aly/ALYunEcsService.java +++ b/src/main/java/com/guo/aly/ALYunEcsService.java @@ -4,19 +4,19 @@ import com.alibaba.fastjson2.JSON; import com.aliyun.ecs20140526.Client; import com.aliyun.ecs20140526.models.*; import com.aliyun.tea.TeaException; +import com.aliyun.teaopenapi.models.Config; import com.aliyun.teautil.Common; import com.aliyun.teautil.models.RuntimeOptions; import com.guo.aly.config.AliConfig; import com.guo.aly.model.EcsSelectModel; import com.guo.aly.model.EscRemoveModel; import com.guo.aly.model.InstanceInfo; +import com.guo.gateway.model.NodeInfo; import lombok.extern.log4j.Log4j2; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; /** @@ -38,33 +38,32 @@ public class ALYunEcsService { } /** - * 根据实例ID和实例名称查询实例信息 + * 根据实例名称/ID查询实例 * @param ecsSelectModel - * @return 返回实例集合信息 + * @return + * @throws Exception */ public List selectECS(EcsSelectModel ecsSelectModel) throws Exception { + + //判断是查询根据名称查询还是ID + if (ecsSelectModel.getInstanceIdList() != null){ + InstanceInfo instanceInfo = selectInstanceId(ecsSelectModel); + return Arrays.asList(instanceInfo); + } + List instanceNameList = ecsSelectModel.getInstanceNameList(); + String nameListString = instanceNameList.toString(); + String substring = nameListString.substring(1, nameListString.length() - 1); + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() .setRegionId(aliConfig.getRegionId()) + .setInstanceName(substring) .setPageSize(10); - // 判断是否指定了实例ID或实例名称 - if (ecsSelectModel.getInstanceIdList() != null && !ecsSelectModel.getInstanceIdList().isEmpty()) { - // 如果指定了实例ID,则设置实例ID查询条件 - describeInstancesRequest.setInstanceIds(String.join(",", ecsSelectModel.getInstanceIdList())); - } else if (ecsSelectModel.getInstanceNameList() != null && !ecsSelectModel.getInstanceNameList().isEmpty()) { - // 如果指定了实例名称,则设置实例名称查询条件 - describeInstancesRequest.setInstanceName(String.join(",", ecsSelectModel.getInstanceNameList())); - } else { - // 如果既没有指定实例ID也没有指定实例名称,则抛出异常或者返回空列表,视情况而定 - throw new IllegalArgumentException("Please provide at least one instance ID or instance name."); - // 或者直接返回空列表 - // return new ArrayList<>(); - } + com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions(); List instanceInfos = new ArrayList<>(); // 用于存储查询到的实例信息 try { - com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions(); DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime); DescribeInstancesResponseBody body = describeInstancesResponse.getBody(); DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = body.getInstances(); @@ -79,7 +78,7 @@ public class ALYunEcsService { // 去掉方括号 publicIpAddress = publicIpAddress.substring(1, publicIpAddress.length() - 1); instanceInfo.setPublicIpAddress(publicIpAddress); - String privateIpAddress = item.getVpcAttributes().getPrivateIpAddress().getIpAddress().toString(); + String privateIpAddress = item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString(); // 去掉方括号 privateIpAddress = privateIpAddress.substring(1, privateIpAddress.length() - 1); instanceInfo.setPrivateIpAddress(privateIpAddress); @@ -87,22 +86,123 @@ public class ALYunEcsService { } } catch (TeaException error) { error.printStackTrace(); - // 可以打印错误信息以便排查问题 - System.out.println("TeaException occurred: " + error.getMessage()); // 异常处理 } catch (Exception _error) { _error.printStackTrace(); - // 可以打印错误信息以便排查问题 - System.out.println("Exception occurred: " + _error.getMessage()); // 异常处理 } return instanceInfos; } + /** + * 实例ID查询实例 + * @param ecsSelectModel + * @return + */ + public InstanceInfo selectInstanceId(EcsSelectModel ecsSelectModel) { + + List instanceIdList = ecsSelectModel.getInstanceIdList(); + String instanceIdListString = instanceIdList.toString(); + String substring = instanceIdListString.substring(1, instanceIdListString.length() - 1); + + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() + .setRegionId(aliConfig.getRegionId()) + .setInstanceName("*") + .setInstanceIds(com.aliyun.teautil.Common.toJSONString(com.aliyun.darabonbastring + .Client.split(substring, ",", 50))) + .setPageSize(10); + + // 初始化返回值 + List instanceList = null; + + try { + // 复制代码运行请自行打印 API 的返回值 + DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, new RuntimeOptions()); + DescribeInstancesResponseBody body = describeInstancesResponse.getBody(); + DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = body.getInstances(); + + instanceList = instances.getInstance(); + + if (instanceList == null || instanceList.isEmpty()) { + return new InstanceInfo(); + } + + InstanceInfo ecsInstanceInfo = new InstanceInfo(); + instanceList.forEach(item -> { + + ecsInstanceInfo.setInstanceName(item.getInstanceName()); + + ecsInstanceInfo.setInstanceId(item.getInstanceId()); + + ecsInstanceInfo.setPublicIpAddress(item.getPublicIpAddress().getIpAddress().toString()); + + ecsInstanceInfo.setPrivateIpAddress(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString()); + }); + + return ecsInstanceInfo; + + } catch (TeaException error) { + log.error("code:[{}], message: [{}],data: [{}]",error.getCode(),error.getMessage(),error.getData()); + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } catch (Exception _error) { + TeaException error = new TeaException(_error.getMessage(), _error); + + log.error("message: [{}]",_error.getMessage(),_error); + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } + + return new InstanceInfo(); + } + public List selectInstance(EcsSelectModel ecsSelectModel) throws Exception { + List instan = new ArrayList<>(); + // 1. 初始化配置 + Config config = new Config(); + // 您的AccessKey ID + config.accessKeyId = aliConfig.getAccessKeyId(); + // 您的AccessKey Secret + config.accessKeySecret = aliConfig.getAccessKeySecret(); + //设置请求地址 + config.endpoint = aliConfig.getEndpoint(); + // 设置连接超时为5000毫秒 + config.connectTimeout = 5000; + // 设置读超时为5000毫秒 + config.readTimeout = 5000; + // 2. 初始化客户端 + Client client = new Client(config); + java.util.List regionIds = com.aliyun.darabonbastring.Client.split(ecsSelectModel.getInstanceIdList().toString(), ",", 50); + for (String regionId : regionIds) { + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() + .setPageSize(100) + .setRegionId(regionId); + DescribeInstancesResponse resp = client.describeInstances(describeInstancesRequest); + java.util.List instances = resp.body.instances.instance; + com.aliyun.teaconsole.Client.log("" + regionId + " 下 ECS 实例列表:"); + for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance instance : instances) { + InstanceInfo instanceInfo = new InstanceInfo(); + instanceInfo.setInstanceId(instance.getInstanceId()); + instanceInfo.setInstanceName(instance.getInstanceName()); + instanceInfo.setPublicIpAddress(instance.getPublicIpAddress().toString()); + instanceInfo.setPrivateIpAddress(instance.getVpcAttributes().getPrivateIpAddress().toString()); + instan.add(instanceInfo); + com.aliyun.teaconsole.Client.log(" " + instance.hostName + " 实例ID " + instance.instanceId + " CPU:" + instance.cpu + " 内存:" + instance.memory + " MB 规格:" + instance.instanceType + " 系统:" + instance.OSType + "(" + instance.OSName + ") 状态:" + instance.status + ""); + } + } + return instan; + } /** * 创建实例方法 * @throws Exception diff --git a/src/main/java/com/guo/aly/config/AliConfig.java b/src/main/java/com/guo/aly/config/AliConfig.java index 9945fb1..0cc8f4c 100644 --- a/src/main/java/com/guo/aly/config/AliConfig.java +++ b/src/main/java/com/guo/aly/config/AliConfig.java @@ -71,6 +71,11 @@ public class AliConfig { */ private String instanceChargeType; + /** + * 请求地址 + */ + private String endpoint; + @Bean public Client createClient(AliConfig aliConfig) throws Exception { // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 diff --git a/src/main/java/com/guo/aly/model/EcsSelectModel.java b/src/main/java/com/guo/aly/model/EcsSelectModel.java index 54d6aaf..284f9d7 100644 --- a/src/main/java/com/guo/aly/model/EcsSelectModel.java +++ b/src/main/java/com/guo/aly/model/EcsSelectModel.java @@ -16,7 +16,7 @@ import java.util.List; @Builder @AllArgsConstructor @NoArgsConstructor -public class EcsSelectModel { + public class EcsSelectModel { /** * 实例ID diff --git a/src/main/java/com/guo/common/constant/Constans.java b/src/main/java/com/guo/common/constant/Constans.java new file mode 100644 index 0000000..9c9b13f --- /dev/null +++ b/src/main/java/com/guo/common/constant/Constans.java @@ -0,0 +1,62 @@ +package com.guo.common.constant; + +/** + * @author gxb + * @description 请求前缀 通用 + * @date 2024-04-20 15:11 + */ +public class Constans { + + /** + * http请求头部前缀 + */ + public final static String HTTP_REQUEST_HEAD = "http://"; + + /** + * 请求端口及路径 + */ + public final static String HTTP_REQUEST_ADDRESS = ":8080/public/cluster"; + + /** + * HTTP请求头的一个字段 + */ + public final static String HTTP_REQUEST_ADDHEADER_AGENT = "User-Agent"; + + /** + * 请求是从Apifox 1.0.0版本的客户端发起 + */ + public final static String HTTP_REQUEST_ADDHEADER_APIFOX = "Apifox/1.0.0 (https://apifox.com)"; + + /** + * HTTP请求头部字段,用于传递访问令牌 + */ + public final static String HTTP_REQUEST_ADDHEADER_ACCESSTOKEN = "Accesstoken"; + + /** + * 空字符串 + */ + public final static String HTTP_REQUEST_NULL_STRING = ""; + + /** + * 数值 0 + */ + public final static Integer NUMERICAL_VALUE_ZERO = 0; + + /** + * 别名mqttInfo获取消息队列 + */ + public final static String HTTP_REQUEST_MQTTINFO = "mqttInfo"; + + /** + * 获取了 "mqttInfo" 对象中名为 "connectSize" 的整数值 + */ + public final static String HTTP_REQUEST_CONNECTSIZE = "connectSize"; + + /** + * 通用默认数值 0L Long + */ + public final static Long NUMERICAL_VALUE_LONG_ZERO = 0L; + + + +} diff --git a/src/main/java/com/guo/controller/GateWayController.java b/src/main/java/com/guo/controller/GateWayController.java index b99de70..c9a0d7b 100644 --- a/src/main/java/com/guo/controller/GateWayController.java +++ b/src/main/java/com/guo/controller/GateWayController.java @@ -3,10 +3,7 @@ package com.guo.controller; import com.guo.common.domain.Result; import com.guo.service.impl.GateWayLoadService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; /** * @author gxb @@ -24,9 +21,10 @@ public class GateWayController { * 获取负载节点 * @return 返回公网IP */ - @PostMapping("/load/node") - public Result loadNode(){ - return Result.success(gateWayLoadService.loadNode()); + @PostMapping("/load/node/{VehicleVIN}") + public Result loadNode(@PathVariable("VehicleVIN") String VehicleVIN){ + + return Result.success(gateWayLoadService.loadNode(VehicleVIN)); } } diff --git a/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java b/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java index 4b5d873..7a51c62 100644 --- a/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java +++ b/src/main/java/com/guo/gateway/cache/LoadSeriesCache.java @@ -40,7 +40,7 @@ public class LoadSeriesCache extends CacheAbs { * @return 自增后的值 */ public Long incrementAndGet(){ - return redisService.increment(encode(CacheConstants.GATEWAY_LOAD_SERIES_KEY),1L); + return redisService.increment(encode(CacheConstants.GATEWAY_LOAD_SERIES_KEY),0L); } /** diff --git a/src/main/java/com/guo/gateway/cache/NodeScoreCache.java b/src/main/java/com/guo/gateway/cache/NodeScoreCache.java index 749e638..6921bc4 100644 --- a/src/main/java/com/guo/gateway/cache/NodeScoreCache.java +++ b/src/main/java/com/guo/gateway/cache/NodeScoreCache.java @@ -69,7 +69,10 @@ public class NodeScoreCache extends CacheAbs { } - + /** + * 获取目前连接数 + * @return + */ public Long getNodeNowNum(){ List workGatewayNodes = getNodeScore(); // 直接将 double 类型结果转换为 Long 类型 @@ -92,16 +95,12 @@ public class NodeScoreCache extends CacheAbs { redisService.redisTemplate.opsForZSet().add(encode(CacheConstants.GATEWAY_NODE_SCORE_CACHE), Collections.singleton(tuple)); } - - /** - * 获取连接数信息 - * @return + * 删除缓存 */ - public List get(){ - return redisService.getCacheObject(encode(CacheConstants.GATEWAY_NODE_SCORE_CACHE)); + public void delete(){ + redisService.deleteObject(encode(CacheConstants.GATEWAY_NODE_SCORE_CACHE)); } - } diff --git a/src/main/java/com/guo/service/impl/GateWayLoadService.java b/src/main/java/com/guo/service/impl/GateWayLoadService.java index 97b1b55..4b32d2b 100644 --- a/src/main/java/com/guo/service/impl/GateWayLoadService.java +++ b/src/main/java/com/guo/service/impl/GateWayLoadService.java @@ -11,7 +11,7 @@ public interface GateWayLoadService { * 获取负载节点 * @return 返回公网IP */ - String loadNode(); + String loadNode(String VehicleVIN); /** * 刷新负载 diff --git a/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java b/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java index 90da7a1..d6d2ea0 100644 --- a/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java +++ b/src/main/java/com/guo/service/impl/GateWayLoadServicelmpl.java @@ -1,29 +1,17 @@ package com.guo.service.impl; -import com.alibaba.fastjson2.JSONObject; import com.guo.common.constant.LoadConstants; import com.guo.gateway.cache.*; import com.guo.gateway.model.NodeInfo; -import com.guo.gateway.model.NodeJoin; +import com.guo.gateway.model.NodeVehicle; import com.guo.gateway.model.WorkGatewayNode; import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.ToString; import lombok.extern.log4j.Log4j2; -import org.redisson.api.RLock; -import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * @author gxb @@ -77,17 +65,22 @@ public class GateWayLoadServicelmpl implements GateWayLoadService { * @return 返回公网IP */ @Override - public String loadNode() { + public String loadNode(String VehicleVIN) { //去刷新负载 refreshLoad(); //获取自增序列值 Long seriesLoad = loadSeriesCache.incrementAndGet(); //获取自增序列值 Long seriesLoadIndex = seriesLoad % LoadConstants.NODE_LENGTH; - //获取负载下标 + //获取负载下标通过获取节点ID String loadNodeId = loadNodeCache.getFindByIndex(seriesLoadIndex); - //通过获取节点ID + //存储 key:实例ID value:VIN + NodeVehicle nodeVehicle = new NodeVehicle(); + nodeVehicle.setNodeId(loadNodeId); + nodeVehicle.setVehicleVin(VehicleVIN); + vehicleLineNodeCache.save(nodeVehicle); + //获取缓存内节点的公网/内网信息 NodeInfo nodeInfo = nodeCache.get(loadNodeId); - //获取缓存内节点的公网/内网信息 返回公网IP + // 返回公网IP return nodeInfo.getPublicIdAddress(); } @@ -98,109 +91,114 @@ public class GateWayLoadServicelmpl implements GateWayLoadService { public void refreshLoad() { //分布式锁 - // RLock refreshLoadLock = redissonClient.getLock("refreshLoadLock"); - // try { - // 尝试获取锁,最多等待10秒,持有锁60秒后自动释放 - // if (refreshLoadLock.tryLock(10, 60, TimeUnit.SECONDS)) { - // 在锁内执行刷新负载的逻辑 + // RLock refreshLoadLock = redissonClient.getLock("refreshLoadLock"); + // try { + // 尝试获取锁,最多等待10秒,持有锁60秒后自动释放 + // if (refreshLoadLock.tryLock(10, 60, TimeUnit.SECONDS)) { + // 在锁内执行刷新负载的逻辑 - List workGatewayNodes = nodeScoreCache.getNodeScore(); - - log.info("实例数量 :" + workGatewayNodes.size()); - - //车辆上线总数量 - long vehicleMaxOnlineNUm = getNodeMaxOnlineNum(); - - //目前连接数 - Long veicleOnlineNowNum = nodeScoreCache.getNodeNowNum(); - - //空余连接数 - long vehicleOnlineNum = vehicleMaxOnlineNUm - veicleOnlineNowNum; - - //转换 - List workGatewayNodeWeight = workGatewayNodes.stream() - .map(workGatewayNode -> WorkGatewayNode.builder() - .nodeId(workGatewayNode.getNodeId()) - .weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (80L - workGatewayNode.getWeight())))) - .build()) - .toList(); - - List loadNodeList = new ArrayList<>(); - - int count = workGatewayNodeWeight.stream().mapToInt(WorkGatewayNode::getWeight).sum(); - - if (count < 100) { - List list = workGatewayNodeWeight.stream().sorted((o1, o2) -> o2.getWeight() - o1.getWeight()).toList(); - - int countWeight = 0; - for (long i = count; i < 100; i++) { - WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); - workGatewayNode.setWeight(workGatewayNode.getWeight() + 1); - } - } - - whFor: - while (true) { - for (WorkGatewayNode workGatewayNode : workGatewayNodeWeight) { - int weight = workGatewayNode.getWeight(); - if (weight > 0) { - loadNodeList.add( - workGatewayNode.getNodeId() - ); - workGatewayNode.setWeight(weight - 1); - - } - - } - int sum = workGatewayNodeWeight.stream(). - mapToInt(WorkGatewayNode::getWeight).sum(); - if (sum <= 0) { - break whFor; - } - } - //重置 - loadSeriesCache.reset(); - //存入负载集合 - loadNodeCache.put(loadNodeList); - } - //} catch (InterruptedException e) { - // Thread.currentThread().interrupt(); - // 处理中断异常 - //} finally { - // 释放锁 - // if (refreshLoadLock.isHeldByCurrentThread()) { - // refreshLoadLock.unlock(); -//} - - /** - * 动态ECS - */ - public void dynamicECS(){ + //获取 所有手机节点ID和对应的连接数 + List workGatewayNodes = nodeScoreCache.getNodeScore(); //车辆上线总数量 long vehicleMaxOnlineNUm = getNodeMaxOnlineNum(); //目前连接数 - Long nodeNowNum = nodeScoreCache.getNodeNowNum(); + Long veicleOnlineNowNum = nodeScoreCache.getNodeNowNum(); - //负载率 - BigDecimal loadRate = new BigDecimal(vehicleMaxOnlineNUm).divide(new BigDecimal(nodeNowNum), 0, BigDecimal.ROUND_HALF_UP); + //空余连接数 + long vehicleOnlineNum = vehicleMaxOnlineNUm - veicleOnlineNowNum; - if (loadRate.longValue() > 80){ - //扩容 - }else if (loadRate.longValue() <20 ){ - //缩容 + //计算权重 + List workGatewayNodeWeight = workGatewayNodes.stream() + .map(workGatewayNode -> WorkGatewayNode.builder() + .nodeId(workGatewayNode.getNodeId()) + .weight((int) (((double) LoadConstants.MAXIMUM - workGatewayNode.getWeight()) / vehicleOnlineNum * LoadConstants.BE_COMMON)) + .build()) + .toList(); + + + System.out.println(workGatewayNodes); + + System.out.println(workGatewayNodeWeight); + + List loadNodeList = new ArrayList<>(); + + int count = workGatewayNodeWeight.stream().mapToInt(WorkGatewayNode::getWeight).sum(); + + if (count < 100) { + List list = workGatewayNodeWeight.stream().sorted((o1, o2) -> o2.getWeight() - o1.getWeight()).toList(); + + int countWeight = 0; + for (long i = count; i < 100; i++) { + WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); + workGatewayNode.setWeight(workGatewayNode.getWeight() + 1); + } } + + whFor: + while (true) { + for (WorkGatewayNode workGatewayNode : workGatewayNodeWeight) { + int weight = workGatewayNode.getWeight(); + if (weight > 0) { + loadNodeList.add( + workGatewayNode.getNodeId() + ); + workGatewayNode.setWeight(weight - 1); + + } + + } + int sum = workGatewayNodeWeight.stream(). + mapToInt(WorkGatewayNode::getWeight).sum(); + if (sum <= 0) { + break whFor; + } + } + //重置 + loadSeriesCache.reset(); + //存入负载集合 + loadNodeCache.put(loadNodeList); } + //} catch (InterruptedException e) { + // Thread.currentThread().interrupt(); + // 处理中断异常 + //} finally { + // 释放锁 + // if (refreshLoadLock.isHeldByCurrentThread()) { + // refreshLoadLock.unlock(); +//} + + /** + * 动态ECS + */ +// public void dynamicLoad() { +// +// //车辆上线总数量 +// long vehicleMaxOnlineNUm = getNodeMaxOnlineNum(); +// +// //目前连接数 +// Long nodeNowNum = nodeScoreCache.getNodeNowNum(); +// +// //负载率 +// BigDecimal loadRate = new BigDecimal(vehicleMaxOnlineNUm).divide(new BigDecimal(nodeNowNum), 0, BigDecimal.ROUND_HALF_UP); +// +// if (loadRate.longValue() > 80) { +// //扩容 +// } else if (loadRate.longValue() < 20) { +// //缩容 +// } +// } /** * 获取最大连接数 + * * @return */ - public Long getNodeMaxOnlineNum(){ + public Long getNodeMaxOnlineNum() { List workGatewayNodes = nodeScoreCache.getNodeScore(); - return workGatewayNodes.size() * 80L; + return workGatewayNodes.size() * LoadConstants.MAXIMUM; } } diff --git a/src/main/java/com/guo/task/Collection.java b/src/main/java/com/guo/task/Collection.java index 1025d3f..b4dc7b5 100644 --- a/src/main/java/com/guo/task/Collection.java +++ b/src/main/java/com/guo/task/Collection.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.guo.aly.ALYunEcsService; import com.guo.aly.model.EcsSelectModel; import com.guo.aly.model.InstanceInfo; +import com.guo.common.constant.Constans; import com.guo.common.model.TotalNumber; import com.guo.gateway.cache.*; import com.guo.gateway.model.NodeInfo; @@ -21,7 +22,6 @@ import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; /** @@ -34,6 +34,17 @@ import java.util.List; @AllArgsConstructor public class Collection { + /** + * 最少节点数量 + */ + private final static Integer mainNumber = 1; + + /** + * 判断连接阈值 + */ + private final static Integer maxNumber = 21; + private final static Integer beforemaxNumber = 20; + /** * 网关负载节点缓存 */ @@ -75,9 +86,13 @@ public class Collection { @Autowired private ContractionVolume contractionVolume; + /** + * 阿里云方法 + */ @Autowired private ALYunEcsService alYunEcsService; + /** * 定时扫描节点信息 */ @@ -85,21 +100,24 @@ public class Collection { public void scheduledEcsCompanding() throws Exception { //查询阿里云是否存在实例 - EcsSelectModel ecsSelectModel = new EcsSelectModel(); - List addArryList = new ArrayList<>(); - addArryList.add("Myname"); - ecsSelectModel.setInstanceNameList(addArryList); - //实例集合 - List instanceLists = alYunEcsService.selectECS(ecsSelectModel); + EcsSelectModel ecsSelectModel = new EcsSelectModel(); + List addArryList = new ArrayList<>(); + addArryList.add("Myname"); + ecsSelectModel.setInstanceNameList(addArryList); + //实例集合 + List instanceLists = alYunEcsService.selectECS(ecsSelectModel); //节点计数 - Long nodeNumber = 0L; + Long nodeNumber = Constans.NUMERICAL_VALUE_LONG_ZERO; //所有节点连接数总数 - long connectionTotal = 0L; + long connectionTotal = Constans.NUMERICAL_VALUE_LONG_ZERO; - //判断实例集合是否为空 - if (!instanceLists.isEmpty()){ + //删除连接数缓存 + nodeScoreCache.delete(); + + // 判断实例集合是否为空 + if (!instanceLists.isEmpty()) { //将实例存入缓存 for (InstanceInfo instance : instanceLists) { @@ -116,30 +134,14 @@ public class Collection { // 将新的 NodeInfo 对象放入缓存 nodeCache.put(nodeInfo); - //获取每个FluxMQ运行信息 - String URL = "http://" + instance.getPublicIpAddress()+":8080/public/cluster"; - OkHttpClient client = new OkHttpClient(); - - Request request = new Request.Builder() - .url(URL) - .get() - .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)") - .addHeader("Accesstoken", "") - .build(); - - try { - Response response = client.newCall(request).execute(); - - JSONArray jsonArray = JSONArray.parseArray(response.body().string()); - JSONObject jsonObject = jsonArray.getJSONObject(0); - JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo"); - int connectSize = mqttInfo.getIntValue("connectSize"); + //查询收集节点连接数 + int connectSize = querymqttConnections(instance.getPublicIpAddress()); log.info("当前:" + instance.getInstanceId() + ",的连接数:" + connectSize); //计数 nodeNumber++; - connectionTotal+=connectSize; + connectionTotal += connectSize; //将连接数存入缓存 // key:网关负载业务 value:网关节点ID + 连接数 @@ -149,33 +151,51 @@ public class Collection { nodeScoreCache.save(nodeJoin); //加层判断,把不满足缩容条件的缓存删除 连接数 > 最低阈值 - if (connectSize > 20 && nodeReduced.isWhether(instance.getInstanceId())){ + if (connectSize > beforemaxNumber && nodeReduced.isWhether(instance.getInstanceId())) { nodeReduced.remove(instance.getInstanceId()); } //判断是否达到缩容条件 节点数量 > 1 或 连接数 < 21 必须满足节点数量在两个及以上,且连接数低于21 - if (instanceLists.size() > 1 && connectSize < 21){ + if (instanceLists.size() > mainNumber && connectSize < maxNumber) { //调用缩容方法 记录 contractionVolume.reduction(instance.getInstanceId()); } - - - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - //封装节点数量和节点连接总数 - TotalNumber totalNumber = new TotalNumber(); - totalNumber.setConnectionTotal(connectionTotal); - totalNumber.setNodeNumber(nodeNumber); - //调用扩容方法去判断是否需要扩容 - contractionVolume.contractionVolume(totalNumber); - + //封装节点数量和节点连接总数 + TotalNumber totalNumber = new TotalNumber(); + totalNumber.setConnectionTotal(connectionTotal); + totalNumber.setNodeNumber(nodeNumber); + //调用扩容方法去判断是否需要扩容 + contractionVolume.contractionVolume(totalNumber); } + + + public int querymqttConnections(String publicIPaddress){ + + //获取每个FluxMQ运行信息 + String URL = Constans.HTTP_REQUEST_HEAD + + publicIPaddress + + Constans.HTTP_REQUEST_ADDRESS; + OkHttpClient client = new OkHttpClient(); + + Request request = new Request.Builder() + .url(URL) + .get() + .addHeader(Constans.HTTP_REQUEST_ADDHEADER_AGENT, Constans.HTTP_REQUEST_ADDHEADER_APIFOX) + .addHeader(Constans.HTTP_REQUEST_ADDHEADER_ACCESSTOKEN, Constans.HTTP_REQUEST_NULL_STRING) + .build(); + + try { + Response response = client.newCall(request).execute(); + JSONArray jsonArray = JSONArray.parseArray(response.body().string()); + JSONObject jsonObject = jsonArray.getJSONObject(Constans.NUMERICAL_VALUE_ZERO); + JSONObject mqttInfo = jsonObject.getJSONObject(Constans.HTTP_REQUEST_MQTTINFO); + return mqttInfo.getIntValue(Constans.HTTP_REQUEST_CONNECTSIZE); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/com/guo/task/Contraction/ContractionVolume.java b/src/main/java/com/guo/task/Contraction/ContractionVolume.java index eeda114..fc3b3e2 100644 --- a/src/main/java/com/guo/task/Contraction/ContractionVolume.java +++ b/src/main/java/com/guo/task/Contraction/ContractionVolume.java @@ -4,11 +4,13 @@ import com.guo.aly.ALYunEcsService; import com.guo.common.constant.LoadConstants; import com.guo.common.model.TotalNumber; import com.guo.gateway.cache.NodeReduced; +import com.guo.gateway.cache.VehicleLineNodeCache; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -21,6 +23,16 @@ import java.util.concurrent.TimeUnit; @Log4j2 public class ContractionVolume { + /** + * 网关车辆对应网关节点 key:实例ID value:VIN + */ + private final VehicleLineNodeCache vehicleLineNodeCache; + + /** + * 数值 5 + */ + private final static Integer TimeExpired = 5; + @Autowired private ALYunEcsService alYunEcsService; @@ -41,13 +53,21 @@ public class ContractionVolume { //查询剩余过期时间 秒 Long expire = nodeReduced.remainingTime(nodeId); //获取 5分钟的秒值 - long fiveMinutesSeconds = TimeUnit.MINUTES.toSeconds(5); + long fiveMinutesSeconds = TimeUnit.MINUTES.toSeconds(TimeExpired); //打印 log.info("空闲节点 :" + nodeId + "的剩余时间:" + expire + "/秒"); //比较 剩余时间 小于5分钟 if (expire < fiveMinutesSeconds){ - //数据迁移 释放节点 + //根据节点ID获取缓存内的车辆VIN值 + Set vehicleVinSetList = vehicleLineNodeCache.get(nodeId); + //判断是否存在车辆信息 + if (!vehicleVinSetList.isEmpty()){ + //执行下线 + } + + + //删除节点信息 } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8f228f5..d4323c3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -20,3 +20,4 @@ aliyun: size: 20 category: cloud_essd instance-charge-type: PostPaid + endpoint: ecs.aliyuncs.com