From 7ee883c388f42b1c39c97e4d27a6192cc627d402 Mon Sep 17 00:00:00 2001 From: lijiayao <13831655+xiao-yao-charge-forward@user.noreply.gitee.com> Date: Sat, 20 Apr 2024 10:26:32 +0800 Subject: [PATCH] 12312 --- .../com/yao/common/aliy/AliYunEcsService.java | 71 ++++++++++++------- .../java/com/yao/common/config/Constants.java | 13 ++++ ...WayNodeInfo.java => GateWayNodeScore.java} | 2 +- .../yao/common/domain/WorkGateWayNode.java | 2 +- .../common/domain/aliy/InstanceRequest.java | 23 ------ .../yao/common/mqtt/MqttConnectService.java | 25 ++++++- .../common/redis/service/RedisService.java | 10 +++ .../gateway/cache/GatewayLoadSeriesCache.java | 2 +- .../gateway/cache/GatewayNodeScoreCache.java | 56 +++++++++------ .../gateway/cache/GatewayNodeSetVinCache.java | 10 +-- .../cache/GatewayVehicleLineNodeCache.java | 2 - .../server/controller/GatewayController.java | 12 +++- .../server/service/GatewayLoadService.java | 5 ++ .../service/impl/GatewayLoadServiceImpl.java | 10 ++- .../server/service/impl/LoadServiceImpl.java | 64 +++++++++-------- src/main/java/com/yao/server/timer/Timer.java | 59 +++++++++------ 16 files changed, 229 insertions(+), 137 deletions(-) rename src/main/java/com/yao/common/domain/{GateWayNodeInfo.java => GateWayNodeScore.java} (92%) delete mode 100644 src/main/java/com/yao/common/domain/aliy/InstanceRequest.java diff --git a/src/main/java/com/yao/common/aliy/AliYunEcsService.java b/src/main/java/com/yao/common/aliy/AliYunEcsService.java index c9927d2..9ee9add 100644 --- a/src/main/java/com/yao/common/aliy/AliYunEcsService.java +++ b/src/main/java/com/yao/common/aliy/AliYunEcsService.java @@ -8,7 +8,10 @@ import com.aliyun.teautil.models.RuntimeOptions; import com.yao.common.aliy.model.EcsSelectModel; import com.yao.common.config.AlyConfigProperties; import com.yao.common.domain.aliy.InstanceInfo; -import com.yao.gateway.cache.GatewayNodeSetVinCache; +import com.yao.gateway.cache.GateWayNodeInfo; +import com.yao.gateway.cache.GatewayLoadSeriesCache; +import com.yao.gateway.cache.GatewayNodeCache; +import com.yao.gateway.cache.GatewayNodeScoreCache; import lombok.extern.log4j.Log4j2; import org.springframework.stereotype.Component; @@ -26,12 +29,19 @@ public class AliYunEcsService { private final AlyConfigProperties alyConfigProperties; private final Client client; + //网关节点分数 + public final GatewayNodeScoreCache gatewayNodeScoreCache; + //网关节点缓存 + private final GatewayNodeCache gatewayNodeCache; + //网关负载序列 + private final GatewayLoadSeriesCache gatewayLoadSeriesCache; - private final GatewayNodeSetVinCache gatewayNodeSetVinCache; - public AliYunEcsService(AlyConfigProperties alyConfigProperties, Client client, GatewayNodeSetVinCache gatewayNodeSetVinCache) { + public AliYunEcsService(AlyConfigProperties alyConfigProperties, Client client, GatewayNodeScoreCache gatewayNodeScoreCache, GatewayNodeCache gatewayNodeCache, GatewayLoadSeriesCache gatewayLoadSeriesCache) { this.alyConfigProperties = alyConfigProperties; this.client = client; - this.gatewayNodeSetVinCache = gatewayNodeSetVinCache; + this.gatewayNodeScoreCache = gatewayNodeScoreCache; + this.gatewayNodeCache = gatewayNodeCache; + this.gatewayLoadSeriesCache = gatewayLoadSeriesCache; } //todo----------------------------------------------------以下是查询代码-------------------------------------------- @@ -42,7 +52,7 @@ public class AliYunEcsService { * @throws Exception * @Description: 根据id和name查询内容 */ - public List selectList(EcsSelectModel ecsSelectModel) throws Exception { + public List selectList(EcsSelectModel ecsSelectModel) { DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() .setRegionId(alyConfigProperties.getRegionId()); if (ecsSelectModel.getInstanceNameList() == null || ecsSelectModel.getInstanceNameList().isEmpty()) { @@ -105,28 +115,39 @@ public class AliYunEcsService { /** * initialization 初始化公共请求参数 */ - public List startCreate() throws Exception { + public List startCreate() { // 公网出带宽最大值,单位为 Mbit/s。取值范围:0~100。 默认值:0。 - Integer internetMaxBandwidthOut = com.aliyun.darabonbanumber.Client.parseInt("5"); + Integer internetMaxBandwidthOut = null; + try { + internetMaxBandwidthOut = com.aliyun.darabonbanumber.Client.parseInt("5"); + List s = RunInstances( + client, alyConfigProperties.getRegionId(), alyConfigProperties.getImageId(), alyConfigProperties.getInstanceType(), + alyConfigProperties.getSecurityGroupId(), alyConfigProperties.getVSwitchId(), internetMaxBandwidthOut, + alyConfigProperties.getInternetChargeType(), alyConfigProperties.getSize(), alyConfigProperties.getCategory(), + alyConfigProperties.getInstanceChargeType()); + EcsSelectModel ecsSelectModel = new EcsSelectModel(); + ecsSelectModel.setInstanceIdList(s); + List list = selectList(ecsSelectModel); + double num = 0; + //新增节点时数据往string类型新增一条数据,然后把序列数据重置为0,接着重新排序 + list.forEach( + item -> { + String publicIP = item.getPublicIpAddress().substring(1, item.getPublicIpAddress().length() - 1); + String privateIP = item.getPrivateIpAddress().substring(1, item.getPrivateIpAddress().length() - 1); + item.setPublicIpAddress(publicIP); + item.setPrivateIpAddress(privateIP); + //存入数据 + gatewayNodeScoreCache.newCount(item.getInstanceId(),num); + gatewayNodeCache.add(new GateWayNodeInfo(item.getInstanceId(),publicIP,privateIP)); + gatewayLoadSeriesCache.init(); + } + ); + return s; + } catch (Exception e) { + throw new RuntimeException(e); + } // 批量创建实例 - List s = RunInstances( - client, alyConfigProperties.getRegionId(), alyConfigProperties.getImageId(), alyConfigProperties.getInstanceType(), - alyConfigProperties.getSecurityGroupId(), alyConfigProperties.getVSwitchId(), internetMaxBandwidthOut, - alyConfigProperties.getInternetChargeType(), alyConfigProperties.getSize(), alyConfigProperties.getCategory(), - alyConfigProperties.getInstanceChargeType()); - EcsSelectModel ecsSelectModel = new EcsSelectModel(); - ecsSelectModel.setInstanceIdList(s); - List list = selectList(ecsSelectModel); - list.forEach( - item -> { - String publicIP = item.getPublicIpAddress().substring(1, item.getPublicIpAddress().length() - 1); - item.setPublicIpAddress(publicIP); - //存入数据 - gatewayNodeSetVinCache.newInstance(item); - log.info("公网IP:" + item.getPublicIpAddress()); - } - ); - return s; + } /** diff --git a/src/main/java/com/yao/common/config/Constants.java b/src/main/java/com/yao/common/config/Constants.java index f365129..9870713 100644 --- a/src/main/java/com/yao/common/config/Constants.java +++ b/src/main/java/com/yao/common/config/Constants.java @@ -58,6 +58,19 @@ public class Constants { // 次数 public static final Integer SUM = 100; + /** + * 扩容 + */ + public static final Long EXPAND_CAPACITY = 80L; + + /** + * 缩容 + */ + public static final Long REDUCE_CAPACITY = 20L; + /** + * 最大连接数 + */ + public static final Long nodeMaxNum = 100L; /** * 登录成功状态 */ diff --git a/src/main/java/com/yao/common/domain/GateWayNodeInfo.java b/src/main/java/com/yao/common/domain/GateWayNodeScore.java similarity index 92% rename from src/main/java/com/yao/common/domain/GateWayNodeInfo.java rename to src/main/java/com/yao/common/domain/GateWayNodeScore.java index c593ffa..556c9b7 100644 --- a/src/main/java/com/yao/common/domain/GateWayNodeInfo.java +++ b/src/main/java/com/yao/common/domain/GateWayNodeScore.java @@ -14,7 +14,7 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor @Builder -public class GateWayNodeInfo { +public class GateWayNodeScore { /** * 节点id diff --git a/src/main/java/com/yao/common/domain/WorkGateWayNode.java b/src/main/java/com/yao/common/domain/WorkGateWayNode.java index e398fe9..751cd7e 100644 --- a/src/main/java/com/yao/common/domain/WorkGateWayNode.java +++ b/src/main/java/com/yao/common/domain/WorkGateWayNode.java @@ -17,6 +17,6 @@ import lombok.NoArgsConstructor; public class WorkGateWayNode { private String nodeId; - private Integer weight; + private Long weight; } diff --git a/src/main/java/com/yao/common/domain/aliy/InstanceRequest.java b/src/main/java/com/yao/common/domain/aliy/InstanceRequest.java deleted file mode 100644 index 980d07a..0000000 --- a/src/main/java/com/yao/common/domain/aliy/InstanceRequest.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.yao.common.domain.aliy; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.io.Serializable; - -/** - * @Author: LiJiaYao - * @Date: 2024/4/16 - * @Description: - */ -@Data -@AllArgsConstructor -@NoArgsConstructor -@Builder -public class InstanceRequest implements Serializable { - - private String publicIpAddress; - private String instanceId; -} diff --git a/src/main/java/com/yao/common/mqtt/MqttConnectService.java b/src/main/java/com/yao/common/mqtt/MqttConnectService.java index 8e1a1ca..b4ad3a9 100644 --- a/src/main/java/com/yao/common/mqtt/MqttConnectService.java +++ b/src/main/java/com/yao/common/mqtt/MqttConnectService.java @@ -2,6 +2,8 @@ package com.yao.common.mqtt; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.yao.common.domain.aliy.InstanceInfo; +import com.yao.gateway.cache.GatewayNodeScoreCache; import com.yao.gateway.cache.GatewayVehicleLineNodeCache; import lombok.extern.log4j.Log4j2; import okhttp3.OkHttpClient; @@ -10,6 +12,7 @@ import okhttp3.Response; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.List; /** * @Author: LiJiaYao @@ -20,10 +23,13 @@ import java.io.IOException; @Log4j2 public class MqttConnectService { - private GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache; - - public MqttConnectService(GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache) { + //网关连接车俩 + private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache; + // 网关节点分数 + private final GatewayNodeScoreCache gatewayNodeScoreCache; + private MqttConnectService(GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, GatewayNodeScoreCache gatewayNodeScoreCache) { this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache; + this.gatewayNodeScoreCache = gatewayNodeScoreCache; } //todo-----------------------连接mqtt方法------------------- @@ -51,4 +57,17 @@ public class MqttConnectService { throw new RuntimeException(e); } } + + /** + * 把传入的数据放入list集合后然后进行连接mqtt + * @param list 实例集合数据 + */ + public void cycle(List list) { + //单个的ip给他mqtt连接 + for (InstanceInfo instanceInfo : list) { + Integer connectSize = connectMqtt(instanceInfo.getPublicIpAddress()); + gatewayNodeScoreCache.newCount(instanceInfo.getInstanceId(),connectSize); + } + + } } diff --git a/src/main/java/com/yao/common/redis/service/RedisService.java b/src/main/java/com/yao/common/redis/service/RedisService.java index 5b5dcc0..f8d1bed 100644 --- a/src/main/java/com/yao/common/redis/service/RedisService.java +++ b/src/main/java/com/yao/common/redis/service/RedisService.java @@ -265,6 +265,16 @@ public class RedisService { return redisTemplate.opsForZSet().range(key,0,-1); } + /** + * 修改score的值 + * @param key 键 + * @param value 值 + * @param score 分数 + */ + public void getZSetRem(final String key, T value, final double score){ + redisTemplate.opsForZSet().add(key,value,score); + } + /** * 获取集合中是分数数据 * @param zSetKey diff --git a/src/main/java/com/yao/gateway/cache/GatewayLoadSeriesCache.java b/src/main/java/com/yao/gateway/cache/GatewayLoadSeriesCache.java index cbe9a59..c9df42d 100644 --- a/src/main/java/com/yao/gateway/cache/GatewayLoadSeriesCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayLoadSeriesCache.java @@ -21,7 +21,7 @@ public class GatewayLoadSeriesCache extends GatewayNodeAbstract { */ @PostConstruct public void init(){ - redisService.setCacheObject(gatewayLoadSeriesKey,0L); + redisService.setCacheObject(gatewayLoadSeriesKey,0); } /** diff --git a/src/main/java/com/yao/gateway/cache/GatewayNodeScoreCache.java b/src/main/java/com/yao/gateway/cache/GatewayNodeScoreCache.java index 9c3f89b..0c00e76 100644 --- a/src/main/java/com/yao/gateway/cache/GatewayNodeScoreCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayNodeScoreCache.java @@ -1,6 +1,8 @@ package com.yao.gateway.cache; -import com.yao.common.domain.GateWayNodeInfo; + +import com.aliyun.teautil.Common; +import com.yao.common.domain.GateWayNodeScore; import com.yao.common.domain.WorkGateWayNode; import com.yao.common.domain.aliy.InstanceInfo; import com.yao.gateway.cache.abs.GatewayNodeAbstract; @@ -27,26 +29,16 @@ public class GatewayNodeScoreCache extends GatewayNodeAbstract { * @param info 新增数据的信息 * @param count 网关连接的个数 */ - public void newCount(InstanceInfo info, Integer count) { - redisService.setCacheZSet(zSetKey, info, count); - } - - /** - * 取出数据 - * - * @return 取出的内容 - */ - public Set get() { - Set cacheZSet = redisService.getCacheZSet(zSetKey); - - return cacheZSet; + public void newCount(String nodeId, double count) { + redisService.setCacheZSet(zSetKey, nodeId, count); } /** * 获取他的内容 + * * @return */ - public List getCacheZSet() { + public List getCacheZSet() { Set> set = redisService.getZSet(zSetKey); // Map nodeMap = set.stream() // .collect(Collectors.toMap( @@ -54,16 +46,27 @@ public class GatewayNodeScoreCache extends GatewayNodeAbstract { // ZSetOperations.TypedTuple::getScore // )); return set.stream() - .map(zset-> - GateWayNodeInfo.builder() - .nodeId(zset.getValue()) - .score(zset.getScore()) - .build() + .map(zset -> + GateWayNodeScore.builder() + .nodeId(zset.getValue()) + .score(zset.getScore()) + .build() ).toList(); - //算法:有没有超过百分比 -// double sum = nodeMap.values().stream().mapToDouble(value -> value).sum(); } + /** + * 这个节点对应连接的节点数 + * @return 节点数 + */ + public Long getGatewayNodeNum() { + List instanceInfos = getCacheZSet(); + //有没有超过百分比 + double sum = instanceInfos.stream().mapToDouble(GateWayNodeScore::getScore).sum(); + return (long) sum; + } + + + /** * 删除数据 */ @@ -71,4 +74,13 @@ public class GatewayNodeScoreCache extends GatewayNodeAbstract { redisService.deleteObject(zSetKey); } + + /** + * 删除这个key其中的一个数据 + * @param gateWayNodeScore 对应的值 + */ + public void remote(GateWayNodeScore gateWayNodeScore){ + redisService.deleteCacheMapValue(zSetKey, Common.toJSONString(gateWayNodeScore)); + } + } diff --git a/src/main/java/com/yao/gateway/cache/GatewayNodeSetVinCache.java b/src/main/java/com/yao/gateway/cache/GatewayNodeSetVinCache.java index b3ee7ee..3274568 100644 --- a/src/main/java/com/yao/gateway/cache/GatewayNodeSetVinCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayNodeSetVinCache.java @@ -17,7 +17,7 @@ public class GatewayNodeSetVinCache extends GatewayNodeAbstract { /** * 阿里云键 */ - private static final String realKey ="new:real:column"; + private static final String realKey ="new:real:column:"; /** * 添加阿里云实列 @@ -29,11 +29,11 @@ public class GatewayNodeSetVinCache extends GatewayNodeAbstract { /** * 取出实例数据 - * @param realKey 键名称 - * @return 相关实例数据值 + * @param ip 键名称 + * @return 车辆的VIN */ - public Set get(){ - return redisService.getCacheSet(realKey); + public Set get(String ip){ + return redisService.getCacheSet(realKey+ip); } /** diff --git a/src/main/java/com/yao/gateway/cache/GatewayVehicleLineNodeCache.java b/src/main/java/com/yao/gateway/cache/GatewayVehicleLineNodeCache.java index e375845..cbdf99b 100644 --- a/src/main/java/com/yao/gateway/cache/GatewayVehicleLineNodeCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayVehicleLineNodeCache.java @@ -39,6 +39,4 @@ public class GatewayVehicleLineNodeCache extends GatewayNodeAbstract { } - - } diff --git a/src/main/java/com/yao/server/controller/GatewayController.java b/src/main/java/com/yao/server/controller/GatewayController.java index 8259b60..ca021ee 100644 --- a/src/main/java/com/yao/server/controller/GatewayController.java +++ b/src/main/java/com/yao/server/controller/GatewayController.java @@ -4,6 +4,8 @@ import com.yao.common.config.Result; import com.yao.server.service.GatewayLoadService; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @@ -14,6 +16,7 @@ import org.springframework.web.bind.annotation.RestController; */ @RestController("/gateway") @Log4j2 +@EnableScheduling public class GatewayController { @Autowired @@ -24,6 +27,13 @@ public class GatewayController { return Result.success(gatewayLoadService.loadNode()); } - + /** + * 每隔10秒就查询一次 + * @param ip + */ + @Scheduled(cron = "0/10 * * * * *") + public void requestLoad(String ip){ + gatewayLoadService.requestLoad(ip); + } } diff --git a/src/main/java/com/yao/server/service/GatewayLoadService.java b/src/main/java/com/yao/server/service/GatewayLoadService.java index fa7979c..70d6a82 100644 --- a/src/main/java/com/yao/server/service/GatewayLoadService.java +++ b/src/main/java/com/yao/server/service/GatewayLoadService.java @@ -12,5 +12,10 @@ public interface GatewayLoadService { */ String loadNode(); + /** + * 请求负载 + */ + void requestLoad(String ip); + } diff --git a/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java b/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java index 237b122..09911b3 100644 --- a/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java +++ b/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java @@ -1,5 +1,6 @@ package com.yao.server.service.impl; +import com.yao.common.mqtt.MqttConnectService; import com.yao.gateway.cache.GateWayNodeInfo; import com.yao.gateway.cache.GatewayLoadNodeCache; import com.yao.gateway.cache.GatewayLoadSeriesCache; @@ -25,7 +26,8 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { private final GatewayLoadSeriesCache gatewayLoadSeriesCache; //网关节点缓存 private final GatewayNodeCache gatewayNodeCache; - + // 连接mqttx的配置类 + private final MqttConnectService mqttConnectService; @Override public String loadNode() { @@ -39,4 +41,10 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { //获取外网ip return gateWayNodeInfo.getPublicIdAddress(); } + + @Override + public void requestLoad(String ip) { + mqttConnectService.connectMqtt(ip); + } + } diff --git a/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java b/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java index ee15504..458a5ce 100644 --- a/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java +++ b/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java @@ -1,10 +1,10 @@ package com.yao.server.service.impl; import com.yao.common.config.Constants; +import com.yao.common.domain.GateWayNodeScore; import com.yao.common.domain.WorkGateWayNode; -import com.yao.common.mqtt.MqttConnectService; import com.yao.gateway.cache.GatewayArithmeticCache; -import com.yao.gateway.cache.GatewayVehicleLineNodeCache; +import com.yao.gateway.cache.GatewayNodeScoreCache; import com.yao.server.service.LoadService; import lombok.extern.log4j.Log4j2; import org.springframework.stereotype.Service; @@ -23,47 +23,43 @@ import java.util.concurrent.CountDownLatch; public class LoadServiceImpl implements LoadService { //网关算法缓存 private final GatewayArithmeticCache gatewayArithmeticCache; - // 网关连接车俩 - private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache; - private MqttConnectService mqttConnectService; + private final GatewayNodeScoreCache gatewayNodeScoreCache; - public LoadServiceImpl(GatewayArithmeticCache gatewayArithmeticCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, MqttConnectService mqttConnectService) { + public LoadServiceImpl(GatewayArithmeticCache gatewayArithmeticCache, GatewayNodeScoreCache gatewayNodeScoreCache) { this.gatewayArithmeticCache = gatewayArithmeticCache; - this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache; - this.mqttConnectService = mqttConnectService; + this.gatewayNodeScoreCache = gatewayNodeScoreCache; } @Override public String load() { //初始化序列 gatewayArithmeticCache.count(); - ArrayList nodeIdList = carWorkGatewayNode(); + List nodeIdList = carWorkGatewayNode(); //100 List loadNodeList = new ArrayList<>(); - Integer count = nodeIdList.stream().mapToInt(WorkGateWayNode::getWeight).sum(); + Long count = nodeIdList.stream().mapToLong(WorkGateWayNode::getWeight).sum(); if (count < Constants.SUM) { List list = nodeIdList.stream() - .sorted(((o1, o2) -> o2.getWeight() - o1.getWeight())) + .sorted(((o1, o2) -> (int) (o2.getWeight() - o1.getWeight()))) .toList(); - Integer countWeight = 0; - for (Integer i = count; i < Constants.SUM; i++) { - WorkGateWayNode workGateWayNode = list.get(countWeight++ % list.size()); + Long countWeight = 0L; + for (Long i = count; i < Constants.SUM; i++) { + WorkGateWayNode workGateWayNode = list.get((int) (countWeight++ % list.size())); workGateWayNode.setWeight(workGateWayNode.getWeight() + 1); } } - work: while (true) { for (WorkGateWayNode workGateWayNode : nodeIdList) { - Integer nodeWeight = workGateWayNode.getWeight(); + Long nodeWeight = workGateWayNode.getWeight(); if (nodeWeight > 0) { loadNodeList.add( workGateWayNode.getNodeId() ); workGateWayNode.setWeight(nodeWeight - 1); } - int intStream = nodeIdList.stream() - .mapToInt(WorkGateWayNode::getWeight) + Long intStream = nodeIdList.stream() + .mapToLong(WorkGateWayNode::getWeight) .sum(); if (intStream <= 0) { break work; @@ -124,17 +120,26 @@ public class LoadServiceImpl implements LoadService { return key; } - public ArrayList carWorkGatewayNode() { - Set ip = gatewayVehicleLineNodeCache.getAddress(); - ArrayList list = new ArrayList<>(); - for (String s : ip) { - Integer connectSize = mqttConnectService.connectMqtt(s); - WorkGateWayNode workGateWayNode = new WorkGateWayNode(); - workGateWayNode.setWeight(connectSize); - workGateWayNode.setNodeId(s); - list.add(workGateWayNode); - } - return list; + public List carWorkGatewayNode() { + + List infos = gatewayNodeScoreCache.getCacheZSet(); + //上线最大数量 + Long vehicleMaxOnlineNum = infos.size() * Constants.nodeMaxNum; + + //目前连接数 + Long vehicleOnlineNowNum = (long) infos.stream().mapToDouble(GateWayNodeScore::getScore).sum(); + //空余连接数 + Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleOnlineNowNum; + List workGateWayNodes = infos.stream() + .map(workGateWay -> { + return WorkGateWayNode + .builder() + .nodeId(workGateWay.getNodeId()) + .weight((long) (vehicleOnlineNum / (Constants.nodeMaxNum - workGateWay.getScore()))) + .build(); + }).toList(); +// Long sum = workGateWayNodes.stream().mapToLong(WorkGateWayNode::getWeight).sum(); + return workGateWayNodes; } @@ -169,7 +174,6 @@ class SitNode { map.put(key, val); log.info(key + "-------" + val); }); - return map; } } diff --git a/src/main/java/com/yao/server/timer/Timer.java b/src/main/java/com/yao/server/timer/Timer.java index e37ada4..a31ad5a 100644 --- a/src/main/java/com/yao/server/timer/Timer.java +++ b/src/main/java/com/yao/server/timer/Timer.java @@ -1,20 +1,21 @@ package com.yao.server.timer; import com.yao.common.aliy.AliYunEcsService; -import com.yao.common.domain.GateWayNodeInfo; +import com.yao.common.aliy.model.EcsSelectModel; +import com.yao.common.config.Constants; +import com.yao.common.domain.GateWayNodeScore; import com.yao.common.domain.aliy.InstanceInfo; import com.yao.common.mqtt.MqttConnectService; -import com.yao.gateway.cache.GatewayNodeScoreCache; -import com.yao.gateway.cache.GatewayNodeSetVinCache; -import com.yao.gateway.cache.GatewayVehicleLineNodeCache; +import com.yao.gateway.cache.*; +import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.ArrayList; import java.util.List; -import java.util.Set; /** * @Author: LiJiaYao @@ -31,15 +32,19 @@ public class Timer { private final GatewayNodeSetVinCache gatewayNodeSetVinCache; private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache; private final GatewayNodeScoreCache gatewayNodeScoreCache; + //网关节点缓存 + private final GatewayNodeCache gatewayNodeCache; - public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService, GatewayNodeSetVinCache gatewayAliYunCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, GatewayNodeScoreCache gatewayNodeScoreCache) { + public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService, GatewayNodeSetVinCache gatewayAliYunCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, GatewayNodeScoreCache gatewayNodeScoreCache, GatewayNodeCache gatewayNodeCache) { this.aliYunEcsService = aliYunEcsService; this.mqttConnectService = mqttConnectService; this.gatewayNodeSetVinCache = gatewayAliYunCache; this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache; this.gatewayNodeScoreCache = gatewayNodeScoreCache; + this.gatewayNodeCache = gatewayNodeCache; } + @SneakyThrows @Async @Scheduled(cron = "0/20 * * * * ?") public void timer() { @@ -47,7 +52,8 @@ public class Timer { long startTime = System.currentTimeMillis(); // 设置结束时间为10分钟后 long endTime = startTime + 10 * 60 * 1000; - Set instance = gatewayNodeSetVinCache.get(); + List instance = gatewayNodeScoreCache.getCacheZSet(); + //判空,如果为空就创建实例 if (instance.isEmpty()) { try { aliYunEcsService.startCreate(); @@ -55,31 +61,40 @@ public class Timer { throw new RuntimeException(e); } } - for (InstanceInfo s : instance) { - Integer connectSize = mqttConnectService.connectMqtt(s.getPublicIpAddress()); + for (GateWayNodeScore gateWayNodeInfo : instance) { List instanceId = null; String deleteInstanceId = null; - if (connectSize >= 79) { + Long gatewayNodeNum = gatewayNodeScoreCache.getGatewayNodeNum(); + //根据id查询出来他的ip地址 + List list = aliYunEcsService.selectList(addNodeId(gateWayNodeInfo.getNodeId())); + //查询ip地址后连接mqtt,查询连接的个数.改变他的值 Cycle 循环处理单个的ip值 + mqttConnectService.cycle(list); + if (gatewayNodeNum >= Constants.EXPAND_CAPACITY) { //执行节点扩容 //返回实例的ID if (!instanceId.isEmpty()) { - try { - instanceId = aliYunEcsService.startCreate(); - log.info("扩容成功!"); - log.info("扩容的节点id为:" + instanceId); - } catch (Exception e) { - throw new RuntimeException(e); - } + instanceId = aliYunEcsService.startCreate(); + log.info("扩容成功!"); + log.info("扩容的节点id为:" + instanceId); + Thread.sleep(3000); } - } - if (connectSize <= 20 && System.currentTimeMillis() < endTime) { - aliYunEcsService.delete(s.getInstanceId()); + } else if (gatewayNodeNum <= Constants.REDUCE_CAPACITY && System.currentTimeMillis() < endTime) { + aliYunEcsService.delete(gateWayNodeInfo.getNodeId()); //删除实列以后再去把redis的值删除 再去通知重新上线 - gatewayNodeSetVinCache.remote(s); - gatewayVehicleLineNodeCache.save(s.getPublicIpAddress()); + gatewayNodeScoreCache.remote(gateWayNodeInfo); +// gatewayVehicleLineNodeCache.save(s.getPublicIpAddress()); log.info("缩容成功!"); log.info("锁容的节点id为:" + deleteInstanceId); } } } + + //让他查询出来他的值 + public EcsSelectModel addNodeId(String nodeId) { + ArrayList nodeIdList = new ArrayList(); + EcsSelectModel ecsSelectModel = new EcsSelectModel(); + nodeIdList.add(nodeId); + ecsSelectModel.setInstanceIdList(nodeIdList); + return ecsSelectModel; + } }