From 79e1240de4111cbf3682d34e0902c859b76508af Mon Sep 17 00:00:00 2001 From: Xiao Fan <461179989@qq.com> Date: Sat, 20 Apr 2024 09:43:20 +0800 Subject: [PATCH] 4.20 --- .../controller/LoadCenterController.java | 135 ------------------ .../controller/WorkGatewayNodeController.java | 104 -------------- .../gateway/model/GatewayNodeInfo.java | 2 +- .../service/impl/GatewayLoadServiceImpl.java | 18 ++- 4 files changed, 16 insertions(+), 243 deletions(-) delete mode 100644 src/main/java/com/xiaofan/loadcenter/controller/LoadCenterController.java delete mode 100644 src/main/java/com/xiaofan/loadcenter/controller/WorkGatewayNodeController.java diff --git a/src/main/java/com/xiaofan/loadcenter/controller/LoadCenterController.java b/src/main/java/com/xiaofan/loadcenter/controller/LoadCenterController.java deleted file mode 100644 index 67fda84..0000000 --- a/src/main/java/com/xiaofan/loadcenter/controller/LoadCenterController.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.xiaofan.loadcenter.controller; - -import com.xiaofan.loadcenter.gateway.cache.GatewayLoadNodeCache; -import com.xiaofan.loadcenter.gateway.cache.GatewayNodeCache; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.xiaofan.loadcenter.common.aliyun.AliYunEcsService; -import com.xiaofan.loadcenter.common.domain.EcsInstanceInfo; -import com.xiaofan.loadcenter.gateway.cache.GatewayNodeScoreCache; -import lombok.extern.log4j.Log4j2; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import com.xiaofan.loadcenter.common.redis.service.RedisService; - -import java.util.ArrayList; -import java.util.Map; -import java.util.Set; - - -@Component -@Log4j2 -public class LoadCenterController { - //计数器,用于判断是否需要进行扩容 - - int aa=0; - //计数器,用于记录循环次数 - - int bb=0; - //注入Redis操作模版 - - @Autowired - StringRedisTemplate redisTemplate; - - //注入Redis服务 - - @Autowired - private RedisService redisService; - //注入阿里云ECS服务 - - @Autowired - private AliYunEcsService aliYunEcsService; - - @Autowired - private GatewayNodeCache gatewayNodeCache; - @Autowired - private GatewayLoadNodeCache gatewayLoadNodeCache; - @Autowired - private GatewayNodeScoreCache gatewayNodeScoreCache; - - /* - * 30秒扫描一次服务器,负载情况 - */ - - @Scheduled(cron = "0/10 * * * * ?") - public void scheduleECS() throws Exception { - // 创建 OkHttpClient 客户端 - OkHttpClient client = new OkHttpClient(); - ArrayList ipCacheSet = new ArrayList<>(); - // 获取缓存中的 ECS 服务器集合 - Map map = gatewayNodeScoreCache.get(); - for (Map.Entry entry : map.entrySet()) { - ipCacheSet.add(entry.getKey().toString()); - } - - - log.info("共有"+ipCacheSet.size()+"个服务器"); - - // 遍历每个 ECS 服务器 - for (String ip : ipCacheSet) { - // 构建请求 URL - String URL = "http://"+ip+":8080/public/cluster"; - Request request = new Request.Builder() - .url(URL) - .get() - .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)") - .addHeader("Accesstoken", "") - .build(); - - try { - // 发起 HTTP 请求获取服务器负载信息 - Response response = client.newCall(request).execute(); - JSONArray jsonArray = JSONArray.parseArray(response.body().string()); - JSONObject jsonObject = jsonArray.getJSONObject(0); - JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo"); - int connectSize = mqttInfo.getIntValue("connectSize"); - // 打印服务器负载信息 - log.info("服务器:"+ip+"-车辆连接数:"+connectSize); - //更新服务器负载信息到Redis -// redisService.setCacheZSet("ECS", ip,connectSize); - gatewayNodeScoreCache.put(ip,connectSize); - - // 判断是否需要进行扩容 - if (connectSize>=5){ - aa++; - log.info("服务器:"+ip+"-----"+aa+"=="+ipCacheSet.size()+"就可以扩容"); - // 如果所有服务器都需要扩容 - if (aa==ipCacheSet.size()){ - - log.info("服务器个数:"+ipCacheSet.size()+",-----循环第"+aa+"次,相等了,需要扩容"); - log.info("执行扩容机制"); - - // 节点扩容 - String instanceId = aliYunEcsService.runInstances(); - - log.info("扩容的节点ip为:" + instanceId); -// log.info("扩容中休眠5秒,再返回,确保先创建,确保查询得到结果-------"); - Thread.sleep(5000); - - // 查询节点信息 - EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId); -// redisService.setCacheZSet("ECS",ecsInstanceInfo.getPublicIpAddress(),0); - gatewayNodeScoreCache.put(ecsInstanceInfo.getPublicIpAddress(),0); - log.info("公网ip存入redis"); - aa=0; - } - - } else { - - aa=0; - } - - } catch (Exception e) { - e.printStackTrace(); - } - - } - - } - -} diff --git a/src/main/java/com/xiaofan/loadcenter/controller/WorkGatewayNodeController.java b/src/main/java/com/xiaofan/loadcenter/controller/WorkGatewayNodeController.java deleted file mode 100644 index 81f7e20..0000000 --- a/src/main/java/com/xiaofan/loadcenter/controller/WorkGatewayNodeController.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.xiaofan.loadcenter.controller; - -import com.xiaofan.loadcenter.gateway.cache.GatewayLoadNodeCache; -import com.xiaofan.loadcenter.gateway.cache.GatewayNodeCache; - -import com.xiaofan.loadcenter.common.domain.Result; -import com.xiaofan.loadcenter.common.domain.WorkGatewayNode; -import com.xiaofan.loadcenter.gateway.cache.GatewayNodeScoreCache; -import com.xiaofan.loadcenter.common.redis.service.RedisService; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import java.util.*; - - -@Log4j2 -@RestController -@RequestMapping("car") -public class WorkGatewayNodeController { - - @Autowired - private RedisService redisService; - - @Autowired - private GatewayNodeCache gatewayNodeCache; - @Autowired - private GatewayLoadNodeCache gatewayLoadNodeCache; - @Autowired - private GatewayNodeScoreCache gatewayNodeScoreCache; - @PostMapping("/workGatewayNode") - public Result getWorkGatewayNode() throws Exception { - // 从缓存中获取节点与权重的映射关系 -// Map map = redisService.getCacheZSetScore("ECS"); - Map map = gatewayNodeScoreCache.get(); - // 将节点与权重映射关系转换为节点列表 - ArrayList nodeIdList = new ArrayList<>(); - // 初始化负载节点列表 - List loadNodeList = new ArrayList<>(); - - - for (Map.Entry entry : map.entrySet()) { - WorkGatewayNode workGatewayNode1 = new WorkGatewayNode(); - log.info(entry.getKey().toString()+"--"+entry.getValue()); - workGatewayNode1.setNodeId(entry.getKey().toString()); - workGatewayNode1.setWeight(entry.getValue().intValue()); - nodeIdList.add(workGatewayNode1); - } - log.info("-----分割线-----"); - System.out.println(nodeIdList); - log.info("-----分割线-----"); - - // 计算节点权重总和 - long count = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum(); - // 如果权重总和小于100,则对节点进行负载均衡 - if (count < 100) { - // 按权重降序排序节点列表 - List list = nodeIdList.stream() - .sorted((o1, o2) -> o2.getWeight() - o1.getWeight()) - .toList(); - // 将权重不足100的节点进行均衡负载 - int countWeight = 0; - for (long i = count; i < 100; i++) { - WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); - workGatewayNode.setWeight(workGatewayNode.getWeight() + 1); - } - } - // 根据节点权重,生成负载节点列表 - while (nodeIdList.stream().anyMatch(node -> node.getWeight() > 0)) { - for (WorkGatewayNode workGatewayNode : nodeIdList) { - int weight = workGatewayNode.getWeight(); - if (weight > 0) { - loadNodeList.add(workGatewayNode.getNodeId()); - workGatewayNode.setWeight(weight - 1); - } - } - } - log.info("----------------------------分割线--------------------------"); - //统计每个对象出现的次数 - Map countMap = new HashMap<>(); - for (Object obj : loadNodeList) { - countMap.put(obj, countMap.getOrDefault(obj, 0) + 1); - } - // 找出出现次数最少的对象 - Object nodeId = null; - int minCount = Integer.MAX_VALUE; - for (Map.Entry entry : countMap.entrySet()) { - if (entry.getValue() < minCount) { - minCount = entry.getValue(); - nodeId = entry.getKey(); - } - } - // 如果有最小数量的对象,则返回最小数量的对象;否则随机选择一个对象 - if (nodeId != null) { - return Result.success(nodeId); - } else { - Random random = new Random(); - return Result.success(random.nextInt(loadNodeList.size())); - } - - } - -} diff --git a/src/main/java/com/xiaofan/loadcenter/gateway/model/GatewayNodeInfo.java b/src/main/java/com/xiaofan/loadcenter/gateway/model/GatewayNodeInfo.java index 0e19008..632d3cd 100644 --- a/src/main/java/com/xiaofan/loadcenter/gateway/model/GatewayNodeInfo.java +++ b/src/main/java/com/xiaofan/loadcenter/gateway/model/GatewayNodeInfo.java @@ -1 +1 @@ -package com.xiaofan.loadcenter.gateway.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * @ProjectName: LoadCenter * @PackageName: com.muyu.loadCenter.domain * @Description 网关节点信息 * @Author XiaoFan * @Date 2024/4/18 14:25 * @Version 1.0 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class GatewayNodeInfo { /** * 节点ID */ private String nodeId; /** * 公网IP */ private String publicIdAddress; } \ No newline at end of file +package com.xiaofan.loadcenter.gateway.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * @ProjectName: LoadCenter * @PackageName: com.muyu.loadCenter.domain * @Description 网关节点信息 * @Author XiaoFan * @Date 2024/4/18 14:25 * @Version 1.0 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class GatewayNodeInfo { /** * 节点ID */ private String nodeId; /** * 公网IP */ private String publicIdAddress; /** * 内网ip */ private String privateIdAddress; } \ No newline at end of file diff --git a/src/main/java/com/xiaofan/loadcenter/service/impl/GatewayLoadServiceImpl.java b/src/main/java/com/xiaofan/loadcenter/service/impl/GatewayLoadServiceImpl.java index eadb813..ef820c5 100644 --- a/src/main/java/com/xiaofan/loadcenter/service/impl/GatewayLoadServiceImpl.java +++ b/src/main/java/com/xiaofan/loadcenter/service/impl/GatewayLoadServiceImpl.java @@ -74,7 +74,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { //计算节点权重总和 long count=nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum(); //如果权重总和小于100,则对节点进行负载均衡 - if (count<100){ + if (count < 100){ //按权重降序排序节点列表 List list=nodeIdList.stream() .sorted((o1, o2) -> o2.getWeight()-o1.getWeight()) @@ -127,6 +127,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { String loadNodeId = gatewayLoadNodeCache.getBydIndex(seriesLoadIndex); // 使用获取到的节点ID从缓存中检索具体的网关节点信息 GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId); + // 返回网关节点的公有ID地址 return gatewayNodeInfo.getPublicIdAddress(); } @@ -156,7 +157,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { try { Response response=client.newCall(request).execute(); //解析响应数据 - JSONArray jsonArray = JSONArray.parseArray(response.body().toString()); + JSONArray jsonArray = JSONArray.parseArray(response.body().string()); JSONObject jsonObject = jsonArray.getJSONObject(0); JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo"); int connectSize=mqttInfo.getIntValue("connectSize"); @@ -178,8 +179,19 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { //获取新实例信息并将其放入Redis EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId); - } + GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo(); + gatewayNodeInfo.setNodeId(ecsInstanceInfo.getInstanceId()); + gatewayNodeInfo.setPublicIdAddress(ecsInstanceInfo.getPublicIpAddress()); + gatewayNodeInfo.setPrivateIdAddress(ecsInstanceInfo.getPrivateIpAddress()); + gatewayNodeCache.put(ecsInstanceInfo.getPublicIpAddress(),gatewayNodeInfo); + //)修改服务器与在线车辆数据 + gatewayNodeScoreCache.put(ecsInstanceInfo.getPublicIpAddress(),0); + log.info("实例id和公网ip存入Redis"); + aa=0; + } + }else{ + aa=0; } }catch (Exception e){ e.printStackTrace();