From ea62395c0ed9ab79c91b4386b235457a2b6cee3d Mon Sep 17 00:00:00 2001 From: JangCan <2862008188@qq.com> Date: Fri, 19 Apr 2024 18:46:33 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E7=BB=93=E6=9E=84=E5=8F=8A=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=EF=BC=8C=E5=8F=8A=E4=B8=80=E4=BA=9B=E4=B8=9A=E5=8A=A1=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/redis/service/RedisService.java | 43 ++++- .../controller/GatewayController.java | 14 ++ .../gateway/cache/GatewayLoadSeriesCache.java | 2 - .../gateway/cache/GatewayNodeScoreCache.java | 50 +++++ .../gateway/cache/GatewayNodeSetVinCache.java | 34 ++++ .../gateway/cache/GatewayVehicleNode.java | 46 +++++ .../gateway/cache/GatewayZSetNodeCache.java | 49 +++++ .../gateway/cache/VehicleOnlineCache.java | 47 +++++ .../gateway/model/WorkGatewayNode.java | 23 +++ .../gateway/model/WorkGatewayNodeSource.java | 22 +++ .../service/GatewayLoadService.java | 5 + .../service/impl/GatewayLoadServiceImpl.java | 181 +++++++++++++++++- 12 files changed, 498 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java create mode 100644 src/main/java/com/loadcenter/gateway/cache/GatewayNodeSetVinCache.java create mode 100644 src/main/java/com/loadcenter/gateway/cache/GatewayVehicleNode.java create mode 100644 src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java create mode 100644 src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java create mode 100644 src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java create mode 100644 src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java diff --git a/src/main/java/com/loadcenter/common/redis/service/RedisService.java b/src/main/java/com/loadcenter/common/redis/service/RedisService.java index 0f2826c..9f86582 100644 --- a/src/main/java/com/loadcenter/common/redis/service/RedisService.java +++ b/src/main/java/com/loadcenter/common/redis/service/RedisService.java @@ -1,9 +1,6 @@ package com.loadcenter.common.redis.service; -import org.springframework.data.redis.core.BoundSetOperations; -import org.springframework.data.redis.core.HashOperations; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.ValueOperations; +import org.springframework.data.redis.core.*; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -143,8 +140,8 @@ public class RedisService { return count == null ? 0 : count; } - public T getCacheListValue(final String key,long index){ - return (T) redisTemplate.opsForList().index(key,index); + public T getCacheListValue(final String key, long index) { + return (T) redisTemplate.opsForList().index(key, index); } /** @@ -158,7 +155,7 @@ public class RedisService { } public T getCacheList(final String key, Long index) { - return (T) redisTemplate.opsForList().index(key, index ); + return (T) redisTemplate.opsForList().index(key, index); } /** @@ -190,6 +187,11 @@ public class RedisService { return setOperation; } + public BoundZSetOperations setCacheZSet(final String key,final T setValue,double score){ + BoundZSetOperations boundZSetOperations = redisTemplate.boundZSetOps(key); + boundZSetOperations.add(setValue,score); + return boundZSetOperations; + } /* * 缓存Zset @@ -205,6 +207,17 @@ public class RedisService { } + public Map getCacheZSetScore(final String key) { + ZSetOperations zSetOperations = redisTemplate.opsForZSet(); + + //构建一个Map 用于存储成员和分数的对应关系 + Map memberScores = new HashMap<>(); + zSetOperations.rangeWithScores(key, 0, -1).forEach(tuple -> { + memberScores.put(tuple.getValue(), tuple.getScore()); + }); + return memberScores; + } + /** * 缓存Set * @@ -218,6 +231,11 @@ public class RedisService { setOperations.remove(setValue); } + public void deleteCacheSet(String key,T setValue){ + BoundSetOperations setOperation = redisTemplate.boundSetOps(key); + setOperation.remove(setValue); + } + /** * 获得缓存的set * @@ -229,6 +247,15 @@ public class RedisService { } + /** + * 删除缓存zset的元素 + * @param key + * @param value + */ + public void deleteCacheZset(final String key ,final String value){ + redisTemplate.opsForZSet().remove(key,value); + } + /** * 缓存Map * @@ -308,6 +335,7 @@ public class RedisService { /** * 减少序列值 + * * @param key * @param number * @return @@ -318,6 +346,7 @@ public class RedisService { /** * 增加序列值 + * * @param key * @param number * @return diff --git a/src/main/java/com/loadcenter/controller/GatewayController.java b/src/main/java/com/loadcenter/controller/GatewayController.java index 5fde14c..e858b51 100644 --- a/src/main/java/com/loadcenter/controller/GatewayController.java +++ b/src/main/java/com/loadcenter/controller/GatewayController.java @@ -3,6 +3,7 @@ package com.loadcenter.controller; import com.loadcenter.common.domain.resp.Result; import com.loadcenter.service.GatewayLoadService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -20,10 +21,23 @@ public class GatewayController { @Autowired private GatewayLoadService gatewayLoadService; + /** + * 发送上线请求 返回一个节点id + * @return + */ @GetMapping("/load/node") public Result loadNode(){ return Result.success(gatewayLoadService.loadNode()); } + /** + * 定时任务,每30秒扫描一次服务器集群的负载情况。 + * 如果任一服务器的车辆连接数达到或超过阈值,则触发扩容; + * 如果服务器的连接数低于阈值,则触发缩容。 + */ + @Scheduled(cron = "0/5 * * * * ?") + public void scheduleECS(){ + gatewayLoadService.scheduleECS(); + } } diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayLoadSeriesCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayLoadSeriesCache.java index b475c71..3deb19b 100644 --- a/src/main/java/com/loadcenter/gateway/cache/GatewayLoadSeriesCache.java +++ b/src/main/java/com/loadcenter/gateway/cache/GatewayLoadSeriesCache.java @@ -1,6 +1,4 @@ package com.loadcenter.gateway.cache; - -import com.loadcenter.common.redis.service.RedisService; import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java new file mode 100644 index 0000000..e5803fb --- /dev/null +++ b/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java @@ -0,0 +1,50 @@ +package com.loadcenter.gateway.cache; + +import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; +import com.loadcenter.gateway.model.WorkGatewayNode; +import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Set; + +/** + * @ClassName GatewayNodeScoreCache + * @Description 网关节点分数 + * @Author Can.J + * @Date 2024/4/19 14:47 + */ +@Service +public class GatewayNodeScoreCache extends GatewayCacheAbs { + private final static String gatewayNodeScoreCacheKey = "score"; + + @Override + public String getPre() { + return "gateway:node:"; + } + + public List get() { + Set> range = redisService.redisTemplate.opsForZSet().rangeWithScores(encode(gatewayNodeScoreCacheKey), 0, -1); + return range.stream() + .map(zSet -> WorkGatewayNode.builder() + .nodeId(zSet.getValue()).source(zSet.getScore()).build()) + .toList(); + } + + public Long getNodeNowNum() { + List workGatewayNodes = get(); + Long vehicleNowOnlineNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getSource).sum())); + + return vehicleNowOnlineNum; + } + + /** + * 获取节点最大上线数量 + * @return + */ + public Long getNodeMaxOnlineNum() { + List workGatewayNodes = get(); + return workGatewayNodes.size() * 80L; + } + +} diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayNodeSetVinCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayNodeSetVinCache.java new file mode 100644 index 0000000..a8b7b3a --- /dev/null +++ b/src/main/java/com/loadcenter/gateway/cache/GatewayNodeSetVinCache.java @@ -0,0 +1,34 @@ +package com.loadcenter.gateway.cache; + +import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +/** + * @ClassName GatewayNodeSetVinCache + * @Description 一个节点对应多辆车的VIN + * @Author Can.J + * @Date 2024/4/19 9:49 + */ +@Component +public class GatewayNodeSetVinCache extends GatewayCacheAbs { + private final static String gatewayNodeSetVinCache ="cars:"; + + + @Override + public String getPre() { + return "gateway:node:"; + } + + /** + * 节点添加vin + * @param node + * @param vin + */ + public void put(String node,String vin){ + redisService.setCacheSet(encode(gatewayNodeSetVinCache)+node,vin); + } + + public void remove(String nodeId,String vin){ + redisService.deleteCacheSet(encode(gatewayNodeSetVinCache)+nodeId,vin); + } +} diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayVehicleNode.java b/src/main/java/com/loadcenter/gateway/cache/GatewayVehicleNode.java new file mode 100644 index 0000000..a988b4b --- /dev/null +++ b/src/main/java/com/loadcenter/gateway/cache/GatewayVehicleNode.java @@ -0,0 +1,46 @@ +package com.loadcenter.gateway.cache; + +import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +/** + * @ClassName GatewayVehicleNode + * @Description 车辆上线业务+VIN:网关节点id + * @Author Can.J + * @Date 2024/4/19 9:39 + */ +@Component +public class GatewayVehicleNode extends GatewayCacheAbs { + private final static String gatewayCarBusinessKey ="business"; + + @Override + public String getPre() { + return "gateway:car:"; + } + + /** + * 添加车辆vin gateway:car:business+vin, 网关节点id + * @param vin + * @param nodeId + */ + public void put(String vin,String nodeId){ + redisService.setCacheObject(encode(gatewayCarBusinessKey)+vin,nodeId); + } + + /** + * 删除车辆vin gateway:car:business + * @param vin + */ + public void remove(String vin){ + redisService.deleteObject(encode(gatewayCarBusinessKey+vin)); + } + + /** + * 获取车辆 + * @param vin + * @return + */ + public String get(String vin){ + return redisService.getCacheObject(encode(gatewayCarBusinessKey)+vin); + } +} diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java new file mode 100644 index 0000000..78c90da --- /dev/null +++ b/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java @@ -0,0 +1,49 @@ +package com.loadcenter.gateway.cache; + +import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @ClassName GatewayZSetNodeCache + * @Description 服务器节点和车辆连接数 + * @Author Can.J + * @Date 2024/4/19 9:00 + */ +@Component +public class GatewayZSetNodeCache extends GatewayCacheAbs { + + private final static String gatewayZSetCount ="count"; + + @Override + public String getPre() { + return "gateway:zSet"; + } + + /** + * 获取所有zset数据 + * @return 负载节点集合 + */ + public Map get(){ + return redisService.getCacheZSetScore(encode(gatewayZSetCount)); + } + + + /** + * 修改服务器的在线车辆 + * @param nodeId + * @param onlineVehicle + */ + public void put(String nodeId,Integer onlineVehicle){ + redisService.setCacheZSet(encode(gatewayZSetCount),nodeId,onlineVehicle); + } + + /** + * 删除服务器的时候删除zset服务器列表,防止重新添加车辆 + * @param nodeId + */ + public void remove(String nodeId){ + redisService.deleteCacheZset(encode(gatewayZSetCount),nodeId); + } +} diff --git a/src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java b/src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java new file mode 100644 index 0000000..9a18e47 --- /dev/null +++ b/src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java @@ -0,0 +1,47 @@ +package com.loadcenter.gateway.cache; + +import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +/** + * @ClassName VehicleOnlineCache + * @Description 描述 + * @Author Can.J + * @Date 2024/4/18 22:02 + */ +@Component +public class VehicleOnlineCache extends GatewayCacheAbs { + + private final static String vehicleOnlineKey ="vehicle_online"; + @Override + public String getPre() { + return "vehicle:"; + } + + /** + * 记录车辆上线业务 + * @param vin 车辆VIN + * @param nodeId 网关节点id + */ + public void recordVehicleOnline(String vin,String nodeId){ + redisService.setCacheObject(encode(vehicleOnlineKey+vin),nodeId); + } + + /** + * 获取车辆上线的网关节点id + * @param vin 车辆VIN + * @return 网关节点id + */ + public String getOnlineGatewayNode(String vin){ + return redisService.getCacheObject(encode(vehicleOnlineKey+vin)); + } + + /** + * 移除车辆上线业务信息 + * @param vin 车辆VIN + */ + public void removeVehicleOnline(String vin){ + redisService.deleteObject(encode(vehicleOnlineKey +vin)); + } + +} diff --git a/src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java b/src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java new file mode 100644 index 0000000..7529825 --- /dev/null +++ b/src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java @@ -0,0 +1,23 @@ +package com.loadcenter.gateway.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class WorkGatewayNode { + /** + * 节点id + */ + private String nodeId; + + /** + * 分数 + */ + private Double source; + +} diff --git a/src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java b/src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java new file mode 100644 index 0000000..e6ecf89 --- /dev/null +++ b/src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java @@ -0,0 +1,22 @@ +package com.loadcenter.gateway.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class WorkGatewayNodeSource { + /** + * 节点id + */ + private String nodeId; + + /** + * 权重 + */ + private int weight; +} diff --git a/src/main/java/com/loadcenter/service/GatewayLoadService.java b/src/main/java/com/loadcenter/service/GatewayLoadService.java index 7b6e996..6fd3409 100644 --- a/src/main/java/com/loadcenter/service/GatewayLoadService.java +++ b/src/main/java/com/loadcenter/service/GatewayLoadService.java @@ -11,4 +11,9 @@ public interface GatewayLoadService { * @return 返回负载节点 */ String loadNode(); + + + void scheduleECS(); + + void refresh(); } diff --git a/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java b/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java index e7ff0b3..f1f3015 100644 --- a/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java +++ b/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java @@ -1,22 +1,34 @@ package com.loadcenter.service.impl; -import com.loadcenter.gateway.cache.GatewayLoadNodeCache; -import com.loadcenter.gateway.cache.GatewayLoadSeriesCache; -import com.loadcenter.gateway.cache.GatewayNodeCache; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.loadcenter.gateway.cache.*; import com.loadcenter.gateway.model.GatewayNodeInfo; +import com.loadcenter.gateway.model.WorkGatewayNode; +import com.loadcenter.gateway.model.WorkGatewayNodeSource; import com.loadcenter.service.GatewayLoadService; import lombok.AllArgsConstructor; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import org.springframework.stereotype.Service; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + /** - * @ClassName GatewayLoadServiceImpl - * @Description 负载实现层 - * @Author Can.J - * @Date 2024/4/18 16:10 + * @ClassName GatewayLoadServiceImpl + * @Description 负载实现层 + * @Author Can.J + * @Date 2024/4/18 16:10 */ @Service @AllArgsConstructor +@Slf4j public class GatewayLoadServiceImpl implements GatewayLoadService { private final Long nodeLength = 100L; /** @@ -35,7 +47,16 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { private final GatewayNodeCache gatewayNodeCache; /** - * 负载节点 + * + */ + private final GatewayZSetNodeCache gatewayZSetNodeCache; + + + private final GatewayNodeScoreCache gatewayNodeScoreCache; + + /** + * 负载节点 + * * @return 返回负载节点 */ @Override @@ -47,4 +68,146 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId); return gatewayNodeInfo.getPublicAddress(); } + + + @Override + public void scheduleECS() { + //客户端初始化 + OkHttpClient client = new OkHttpClient(); + ArrayList ipCacheSet = new ArrayList<>(); + + //从redis获取服务器IP集合 + Map map = gatewayZSetNodeCache.get(); + + for (Map.Entry entry : map.entrySet()) { + ipCacheSet.add(entry.getKey().toString()); + } + + log.info("共有:[{}]个服务器", ipCacheSet.size()); + + //遍历每台服务器进行负载检查 + for (String ip : ipCacheSet) { + + //构建请求URI和请求头 + String URL = "http://" + ip + ":8080/public/cluster"; + Request request = new Request.Builder() + .url(URL) + .get() + .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)") + .addHeader("Accesstoken", "") + .build(); + + try { + Response response = client.newCall(request).execute(); + //解析响应数据 + JSONArray jsonArray = JSONArray.parseArray(response.body().string()); + JSONObject jsonObject = jsonArray.getJSONObject(0); + JSONObject mqttInfo = jsonObject.getJSONObject("cartest"); + int connectSize = mqttInfo.getIntValue("connectSize"); + + //更新redis中服务器的连接数zset数据类型 + log.info("服务器:[{}]:车辆连接数:[{}]", ip, connectSize); + gatewayZSetNodeCache.put(ip, connectSize); + + // 根据连接数判断是否需要进行扩容或缩容 + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * 刷新负载规则 实现动态负载 + */ + + @Override + public void refresh() { + Long nodeMaxNum = 80L; + List workGatewayNodes = gatewayNodeScoreCache.get(); + //上线最大数量 + Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum(); + //当前连接数 + Long vehicleNowOnlineNum = gatewayNodeScoreCache.getNodeNowNum(); + //空余连接数 + Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleNowOnlineNum; + + List loadNodeList =new ArrayList<>(); + List workGatewayNodeSources = workGatewayNodes.stream() + .map(workGatewayNode -> + WorkGatewayNodeSource.builder() + .nodeId(workGatewayNode.getNodeId()) + .weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (nodeMaxNum - workGatewayNode.getSource())))) + .build()) + .toList(); + // 计算节点列表中所有节点的权重之和 + long count = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum(); + // 如果权重之和小于100,则对节点列表按照权重进行降序排序,并将剩余权重平均分配给前几个节点,直到权重之和等于100 + if (count < 100) { + // 对节点按权重降序排序 + List list = workGatewayNodeSources.stream() + .sorted((o1, o2) -> o2.getWeight() - o1.getWeight()) + .toList(); + // 打印排序后的节点列表 + System.out.println(list); + + // 将剩余权重分配给最低权重的节点,直到总和达到100 + int countWeight = 0; + for (long i = count; i < 100; i++) { + WorkGatewayNodeSource workGatewayNodeSource = list.get(countWeight++ % list.size()); + workGatewayNodeSource.setWeight(workGatewayNodeSource.getWeight() + 1); + } + } + // 当所有节点权重为0时,跳出循环 + whFor:while (true) { + // 遍历节点列表,将权重大于0的节点ID添加到loadNodeList中,并将节点权重减1 + for (WorkGatewayNodeSource workGatewayNodeSource : workGatewayNodeSources) { + int weight = workGatewayNodeSource.getWeight(); + if (weight > 0) { + loadNodeList.add( + workGatewayNodeSource.getNodeId() + ); + workGatewayNodeSource.setWeight(weight - 1); + } + } + int sum = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum(); + // 如果权重之和小于等于0,跳出循环 + if (sum <= 0) { + break whFor; + } + } + gatewayLoadSeriesCache.reset(); + + gatewayLoadNodeCache.put(loadNodeList); + } + + /** + * 动态ECS + */ + public void dynamicEcs(){ + //上线最大数量 + Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum(); + //当前连接数 + Long vehicleOnlineNowNum = gatewayNodeScoreCache.getNodeNowNum(); + + BigDecimal loadRate = new BigDecimal(vehicleMaxOnlineNum).divide(new BigDecimal(vehicleOnlineNowNum), 0, BigDecimal.ROUND_HALF_UP); + + if(loadRate.longValue()>80){ + //调用扩容逻辑 + }else if (loadRate.longValue()<20){ + //调用缩容逻辑 + } + } + + /** + * 扩容逻辑 + * TODO + */ + + /** + * 缩容逻辑 + * TODO + */ } + + +