diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..8a1c2a8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,46 @@ +###################################################################### +# Build Tools + +.gradle +/build/ +!gradle/wrapper/gradle-wrapper.jar + +target/ +!.mvn/wrapper/maven-wrapper.jar + +###################################################################### +# IDE + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### JRebel ### +rebel.xml +### NetBeans ### +nbproject/private/ +build/* +nbbuild/ +dist/ +nbdist/ +.nb-gradle/ + +###################################################################### +# Others +*.log +*.xml.versionsBackup +*.swp + +!*/build/*.java +!*/build/*.html +!*/build/*.xml diff --git a/pom.xml b/pom.xml index b94de24..dd6dcd0 100644 --- a/pom.xml +++ b/pom.xml @@ -62,7 +62,11 @@ org.springframework.boot spring-boot-starter-web - + + + + + org.springframework.boot spring-boot-starter-test diff --git a/src/main/java/com/muyu/loadCenter/aliyun/service/AliYunEcsService.java b/src/main/java/com/muyu/loadCenter/aliyun/service/AliYunEcsService.java index 89c51b0..eac1741 100644 --- a/src/main/java/com/muyu/loadCenter/aliyun/service/AliYunEcsService.java +++ b/src/main/java/com/muyu/loadCenter/aliyun/service/AliYunEcsService.java @@ -4,11 +4,11 @@ import com.aliyun.ecs20140526.Client; import com.aliyun.ecs20140526.models.*; import com.aliyun.tea.TeaException; import com.aliyun.teautil.models.RuntimeOptions; -import com.muyu.loadCenter.aliyun.config.AliConfig; +import com.muyu.loadCenter.config.AliConfig; import com.muyu.loadCenter.aliyun.utils.UserUtil; import com.muyu.loadCenter.domain.EcsInstanceInfo; -import com.muyu.loadCenter.aliyun.config.InstanceConfig; +import com.muyu.loadCenter.config.InstanceConfig; import lombok.extern.log4j.Log4j2; import org.springframework.stereotype.Service; diff --git a/src/main/java/com/muyu/loadCenter/aliyun/config/AliConfig.java b/src/main/java/com/muyu/loadCenter/config/AliConfig.java similarity index 96% rename from src/main/java/com/muyu/loadCenter/aliyun/config/AliConfig.java rename to src/main/java/com/muyu/loadCenter/config/AliConfig.java index ef87722..a3a623b 100644 --- a/src/main/java/com/muyu/loadCenter/aliyun/config/AliConfig.java +++ b/src/main/java/com/muyu/loadCenter/config/AliConfig.java @@ -1,4 +1,4 @@ -package com.muyu.loadCenter.aliyun.config; +package com.muyu.loadCenter.config; import com.aliyun.ecs20140526.Client; import com.aliyun.teaopenapi.models.Config; diff --git a/src/main/java/com/muyu/loadCenter/aliyun/config/InstanceConfig.java b/src/main/java/com/muyu/loadCenter/config/InstanceConfig.java similarity index 96% rename from src/main/java/com/muyu/loadCenter/aliyun/config/InstanceConfig.java rename to src/main/java/com/muyu/loadCenter/config/InstanceConfig.java index 2e747a2..b92276b 100644 --- a/src/main/java/com/muyu/loadCenter/aliyun/config/InstanceConfig.java +++ b/src/main/java/com/muyu/loadCenter/config/InstanceConfig.java @@ -1,4 +1,4 @@ -package com.muyu.loadCenter.aliyun.config; +package com.muyu.loadCenter.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; diff --git a/src/main/java/com/muyu/loadCenter/aliyun/config/RestClientConfig.java b/src/main/java/com/muyu/loadCenter/config/RestClientConfig.java similarity index 85% rename from src/main/java/com/muyu/loadCenter/aliyun/config/RestClientConfig.java rename to src/main/java/com/muyu/loadCenter/config/RestClientConfig.java index ad34322..338339a 100644 --- a/src/main/java/com/muyu/loadCenter/aliyun/config/RestClientConfig.java +++ b/src/main/java/com/muyu/loadCenter/config/RestClientConfig.java @@ -1,4 +1,4 @@ -package com.muyu.loadCenter.aliyun.config;//package com.muyu.business.domain.config; +package com.muyu.loadCenter.config;//package com.muyu.business.domain.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/src/main/java/com/muyu/loadCenter/controller/DownLineController.java b/src/main/java/com/muyu/loadCenter/controller/DownLineController.java deleted file mode 100644 index 0dc77c7..0000000 --- a/src/main/java/com/muyu/loadCenter/controller/DownLineController.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.muyu.loadCenter.controller; - -import com.muyu.loadCenter.redis.service.RedisService; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -/** - * 处理车辆下线业务 - * @Author HuangDaJu - * @Date 2024/4/17 21:28 - * @Version 1.0 - */ -@Log4j2 -@RestController -@RequestMapping("downLine") -public class DownLineController { - - - @Autowired - private RedisService redisService; - - @PostMapping("/carDownLine/{vin}") - public void getVinDownLine(@PathVariable String vin) throws Exception { - - - String nodeId = redisService.getCacheObject("delete:" + vin); - - redisService.deleteObject("delete:"+vin); - - redisService.deleteCacheSet("release:"+nodeId,vin); - - log.info("车辆下线成功"); - } - - - -} diff --git a/src/main/java/com/muyu/loadCenter/controller/GatewayController.java b/src/main/java/com/muyu/loadCenter/controller/GatewayController.java new file mode 100644 index 0000000..f076407 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/controller/GatewayController.java @@ -0,0 +1,49 @@ +package com.muyu.loadCenter.controller; + +import com.muyu.loadCenter.domain.Result; +import com.muyu.loadCenter.service.GatewayLoadService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @ProjectName: load_center + * @PackageName: com.muyu.loadCenter.controller + * @Description 网关控制层->车辆控制 + * @Author HuangDaJu + * @Date 2024/4/18 14:14 + * @Version 1.0 + */ +@RestController +@RequestMapping("/gateway") +public class GatewayController { + + @Autowired + private GatewayLoadService gatewayLoadService; + + /** + * 发送上线请求,返回一个节点id + */ + @GetMapping("/load/node") + public Result loadNode(){ + return Result.success(gatewayLoadService.loadNode()); + } + /** + * 定时任务,每30秒扫描一次服务器集群的负载情况。 + * 如果任一服务器的车辆连接数达到或超过阈值,则触发扩容; + * 如果服务器的连接数低于阈值,则触发缩容。 + */ + @Scheduled(cron = "0/5 * * * * ?") + public void scheduleECS() throws Exception { + gatewayLoadService.scheduleECS(); + } + + + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/controller/GatewayVehicleController.java b/src/main/java/com/muyu/loadCenter/controller/GatewayVehicleController.java new file mode 100644 index 0000000..a13dc97 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/controller/GatewayVehicleController.java @@ -0,0 +1,57 @@ +package com.muyu.loadCenter.controller; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.muyu.loadCenter.domain.Result; +import com.muyu.loadCenter.service.GatewayVehicleService; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.Map; + +/** + * @Description 操作车辆上下线 + * @Author HuangDaJu + * @Date 2024/4/18 18:37 + * @Version 1.0 + */ + + +@RestController +@RequestMapping("/vehicle") +public class GatewayVehicleController { + + @Autowired + private GatewayVehicleService gatewayVehicleService; + + /** + * 车辆上线 + */ + + @GetMapping("/load/topLine/{vin}") + public Result topLine(@PathVariable String vin){ + return gatewayVehicleService.topLine(vin); + } + + /** + * 车辆下线 + */ + + @GetMapping("/load/downLine/{vin}") + public void loadDownLine(@PathVariable String vin){ + gatewayVehicleService.loadDownLine(vin); + } + + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/controller/LoadCenterController.java b/src/main/java/com/muyu/loadCenter/controller/LoadCenterController.java deleted file mode 100644 index 9b5a09f..0000000 --- a/src/main/java/com/muyu/loadCenter/controller/LoadCenterController.java +++ /dev/null @@ -1,148 +0,0 @@ -package com.muyu.loadCenter.controller; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.muyu.loadCenter.aliyun.service.AliYunEcsService; -import com.muyu.loadCenter.domain.EcsInstanceInfo; -import com.muyu.loadCenter.domain.Result; -import com.muyu.loadCenter.service.ReleaseEcsDownLine; -import lombok.extern.log4j.Log4j2; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import com.muyu.loadCenter.redis.service.RedisService; -import org.springframework.web.client.RestTemplate; - -import java.util.Set; - - -/** - * 负载中心控制器,负责管理服务器集群的负载情况,并根据负载进行扩容或缩容操作。 - * @Author HuangDaJu - * @Date 2024/4/13 08:43 - */ -@Component -@Log4j2 -public class LoadCenterController { - // 用于临时统计连接数以判断是否需要扩容的变量 - int aa=0; - // 临时变量,无特定用途(根据现有代码) - int bb=0; -// Redis字符串模板,用于与Redis进行交互 - - @Autowired - private RedisService redisService; // Redis服务,封装了与Redis操作相关的功能 - - @Autowired - private AliYunEcsService aliYunEcsService; // 阿里云ECS服务,用于管理云服务器 - - @Autowired - private RestTemplate restTemplate; - - @Autowired - private ReleaseEcsDownLine releaseEcsDownLine;// 用于与其他服务进行HTTP交互的模板 - - /** - * 定时任务,每30秒扫描一次服务器集群的负载情况。 - * 如果任一服务器的车辆连接数达到或超过阈值,则触发扩容; - * 如果服务器的连接数低于阈值,则触发缩容。 - */ - @Scheduled(cron = "0/5 * * * * ?") - public void scheduleECS() throws Exception { - // 客户端初始化 - OkHttpClient client = new OkHttpClient(); - // 从Redis获取服务器IP集合 - Set ipCacheSet = redisService.getCacheZSet("ECS"); - - log.info("共有个"+ipCacheSet.size()+"服务器"); - - // 遍历每台服务器进行负载检查 - for (String ip : ipCacheSet) { - - // 构建请求URL和请求头 - String URL = "http://"+ip+":8080/public/cluster"; - Request request = new Request.Builder() - .url(URL) - .get() - .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)") - .addHeader("Accesstoken", "") - .build(); - - try { - Response response = client.newCall(request).execute(); - // 解析响应数据 - JSONArray jsonArray = JSONArray.parseArray(response.body().string()); - JSONObject jsonObject = jsonArray.getJSONObject(0); - JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo"); - int connectSize = mqttInfo.getIntValue("connectSize"); - - log.info("服务器:"+ip+"-车辆连接数:"+connectSize); - - // 更新Redis中服务器的连接数ZSet数据类型 - redisService.setCacheZSet("ECS", ip, connectSize); - - // 根据连接数判断是否需要进行扩容或缩容 - if (connectSize >= 6) { - aa++; - // 当满足扩容条件时,记录日志并执行扩容操作 - if (aa == ipCacheSet.size()) { - log.info("需要扩容"); - // 调用阿里云ECS服务创建新实例 - String instanceId = aliYunEcsService.runInstances(); - log.info("扩容的节点ip为:" + instanceId); - - // 休眠5秒以确保新实例创建完成 - Thread.sleep(5000); - - // 获取新实例信息并将其持久化到本地数据库 - EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId); - - - //String数据类型:创建好并查询的对象转换为JSON字符串,信息存入redis第一个数据类型 先存入redis 确保正常运行了在假如Zset表 - redisService.setCacheObject("ecsInstance:"+ecsInstanceInfo.getPublicIpAddress(), ecsInstanceInfo.getInstanceId()); - - - - //这里模拟(也可以在别的类里完成) ECS创建成功后,服务器发送一条消息服务器正常启动,mq可以正常使用,存入redis - redisService.setCacheZSet("ECS", ecsInstanceInfo.getPublicIpAddress(), 0); - -// String url = "http://127.0.0.1:9006/ecsInstance/add"; -// restTemplate.postForObject(url, ecsInstanceInfo, Result.class); - log.info("实例信息持久化本地"); - - // 将新实例的IP和ID存入Redis - log.info("实例id和公网ip存入redis"); - aa = 0; // 重置计数器 - } - } - else if (connectSize <= 2) { - - //删除ECS里面的ip,车辆再次上线,找不到这个要缩容的服务器,让找不到 - redisService.deleteCacheZset("ECS" ,ip); - -// 缩容逻辑:删除连接数过低的服务器实例 -// String url = "http://127.0.0.1:9006/ecsInstance/select/" + ip; -// Result result = restTemplate.postForObject(url, null, Result.class); -// String instanceId = (String) result.getData(); - - releaseEcsDownLine.releaseEcsDownLine(ip); - -// aliYunEcsService.releaseECS(instanceId); // 释放ECS实例 - -// Long i = redisService.deleteCacheZset("ECS", ip);// 从Redis中删除该服务器的记录 - - - log.info("连接数小于2,服务器缩容:" + ip); - aa = 0; // 重置计数器 - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } -} diff --git a/src/main/java/com/muyu/loadCenter/gateway/abs/GatewayCacheAbs.java b/src/main/java/com/muyu/loadCenter/gateway/abs/GatewayCacheAbs.java new file mode 100644 index 0000000..a2548ac --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/abs/GatewayCacheAbs.java @@ -0,0 +1,27 @@ +package com.muyu.loadCenter.gateway.abs; + +import com.muyu.loadCenter.redis.service.RedisService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @ProjectName: load_center + * @PackageName: com.muyu.loadCenter.gateway.abs + * @Description 缓存抽象类 + * @Author HuangDaJu + * @Date 2024/4/18 11:37 + * @Version 1.0 + */ +public abstract class GatewayCacheAbs { + + @Autowired + public RedisService redisService; + + public abstract String getPre(); + + public String encode(K key){ + return getPre()+key; + } + + + +} diff --git a/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayLoadNodeCache.java b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayLoadNodeCache.java new file mode 100644 index 0000000..b16c98b --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayLoadNodeCache.java @@ -0,0 +1,65 @@ +package com.muyu.loadCenter.gateway.cache; + +import com.muyu.loadCenter.gateway.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @Description 网关负载节点 按比例存 redis 100 个数据那个 + * @Author HuangDaJu + * @Date 2024/4/18 11:35 + * @Version 1.0 + */ +@Component +public class GatewayLoadNodeCache extends GatewayCacheAbs { + + private final static String gatewayLoadNodeKey="node"; + + + @Override + public String getPre() { + return "gateway:lode:"; + } + + /** + * 存负载节点 + * @param nodeList 节点权重集合 + */ + + public void put(List nodeList){ + redisService.deleteObject(encode(gatewayLoadNodeKey)); + redisService.setCacheList(encode(gatewayLoadNodeKey),nodeList); + } + + /** + * 获取所有负载节点 + * @return 负载节点集合 + */ + public List get(){ + return redisService.getCacheList(encode(gatewayLoadNodeKey)); + } + + + + /** + * 通过下标获取节点 + * @param index 下标 + * @return 指定节点 + */ + + public String getByIndex(Long index){ + if (index ==null || index > 100){ + throw new RuntimeException("下标违法,0-100"); + } + return redisService.getCacheListValue(encode(gatewayLoadNodeKey),index); + } + + + + + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayLoadSeriesCache.java b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayLoadSeriesCache.java new file mode 100644 index 0000000..81c74ee --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayLoadSeriesCache.java @@ -0,0 +1,63 @@ +package com.muyu.loadCenter.gateway.cache; + +import com.muyu.loadCenter.gateway.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * @Description 网关负载序列 自增序列 series + * @Author HuangDaJu + * @Date 2024/4/18 11:57 + * @Version 1.0 + */ +@Component +public class GatewayLoadSeriesCache extends GatewayCacheAbs { + + private final static String gatewayLoadSeriesKey = "series"; + + + @Override + public String getPre () { + return "gateway:load:"; + } + + /** + * 获取自增序列值 + * @return 自增后的值 + */ + public Long incrementAndGet(){ + return redisService.increment(encode(gatewayLoadSeriesKey), 1L); + } + + /** + * bean创建完成之后执行方法 + */ + @PostConstruct + public void init(){ + redisService.setCacheObject(encode(gatewayLoadSeriesKey), 0); + } + + /** + * 获取当前序列值 + * @return 序列值 + */ + public Long get(){ + return redisService.getCacheObject(encode(gatewayLoadSeriesKey)); + } + + + + /** + * 重置 + */ + public void reset(){ + this.init(); + } + + + + +} + + diff --git a/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayNodeCache.java b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayNodeCache.java new file mode 100644 index 0000000..869bcbf --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayNodeCache.java @@ -0,0 +1,59 @@ +package com.muyu.loadCenter.gateway.cache; + +import com.muyu.loadCenter.gateway.abs.GatewayCacheAbs; +import com.muyu.loadCenter.gateway.model.GatewayNodeInfo; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @Description 网关节点缓存 String数据类型 取的是对象 info 对象 + * @Author HuangDaJu + * @Date 2024/4/18 14:08 + * @Version 1.0 + */ +@Component +public class GatewayNodeCache extends GatewayCacheAbs { + + private final static String gatewayNodeCache = "info:"; + @Override + public String getPre() { + return "gateway:node:"; + } + + /** + * 增加缓存数据 + * @param gatewayNodeInfo + */ + public void put(String ip,GatewayNodeInfo gatewayNodeInfo){ + redisService.setCacheObject(encode(gatewayNodeCache)+ip,gatewayNodeInfo); + } + + + + /** + * 获取单个缓存数据 + */ + public GatewayNodeInfo get(String ip){ + return redisService.getCacheObject(encode(gatewayNodeCache)+ip); + } + + + public List get(){ + return redisService.getCacheObject(encode(gatewayNodeCache)); + } + + /** + * 删除网关节点 + */ + + public void remove(String nodeId){ + redisService.deleteObject(encode(nodeId)); + } + + + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayNodeSetVinCache.java b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayNodeSetVinCache.java new file mode 100644 index 0000000..82d23bd --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayNodeSetVinCache.java @@ -0,0 +1,47 @@ +package com.muyu.loadCenter.gateway.cache; + +import com.muyu.loadCenter.gateway.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +/** + * @Description 一个节点对多辆车的vin集合 + * @Author HuangDaJu + * @Date 2024/4/18 19:13 + * @Version 1.0 + */ +@Component +public class GatewayNodeSetVinCache extends GatewayCacheAbs { + private final static String gatewayNodeSetVinCache="many:"; + @Override + public String getPre() { + return "gateway:node:"; + } + + + /** + * 节点添加vin + */ + + public void put(String node,String vin){ + redisService.setCacheSet(encode(gatewayNodeSetVinCache)+node,vin); + } + + + /** + * 删除节点对应的vin + */ + + public void remove(String nodeId,String vin){ + redisService.deleteCacheSet(encode(gatewayNodeSetVinCache)+nodeId,vin); + } + + + + + + + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayVehicleNode.java b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayVehicleNode.java new file mode 100644 index 0000000..d7f43fb --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayVehicleNode.java @@ -0,0 +1,50 @@ +package com.muyu.loadCenter.gateway.cache; + +import com.muyu.loadCenter.gateway.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +/** + * @Description 车辆上线业务+vin:网关节点id + * @Author HuangDaJu + * @Date 2024/4/18 17:20 + * @Version 1.0 + */ +@Component +public class GatewayVehicleNode extends GatewayCacheAbs { + + private final static String gatewayCarBusinessKey="business:"; + @Override + public String getPre() { + return "gateway:car:"; + } + + /** + * 添加车辆vin gateway:car:business+vin, 网关节点id + */ + public void put(String vin,String nodeId){ + redisService.setCacheObject(encode(gatewayCarBusinessKey)+vin,nodeId); + } + + + + + /** + * 删除车辆vin gateway:car:business + */ + public void remove(String vin){ + redisService.deleteObject(encode(gatewayCarBusinessKey)+vin); + } + + + /** + * 获取车辆 nodeId + */ + public String get(String vin){ + return redisService.getCacheObject(encode(gatewayCarBusinessKey)+vin); + } + + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayZSetNodeCache.java b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayZSetNodeCache.java new file mode 100644 index 0000000..14700de --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/cache/GatewayZSetNodeCache.java @@ -0,0 +1,55 @@ +package com.muyu.loadCenter.gateway.cache; + +import com.muyu.loadCenter.gateway.abs.GatewayCacheAbs; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +/** + * @Description 服务器节点+车辆连接数 + * @Author HuangDaJu + * @Date 2024/4/18 15:32 + * @Version 1.0 + */ +@Component +public class GatewayZSetNodeCache extends GatewayCacheAbs { + + private final static String gatewayZSetCount="count"; + + + @Override + public String getPre() { + return "gateway:zSet:"; + } + + + /** + * 获取所有zset数据 + * @return 负载节点集合 + */ + public Map get(){ + return redisService.getCacheZSetScore(encode(gatewayZSetCount)); + } + + /** + * 修改服务器与在线车辆 + */ + + public void put(String nodeId ,Integer onlineVehicle){ + redisService.setCacheZSet(encode(gatewayZSetCount),nodeId,onlineVehicle); + } + + /** + * 删除服务器的时候删除zset服务器列表,防止重新添加车辆 + */ + + + public void remove(String nodeId){ + redisService.deleteCacheZset(encode(gatewayZSetCount),nodeId); + } + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/gateway/model/GatewayNodeInfo.java b/src/main/java/com/muyu/loadCenter/gateway/model/GatewayNodeInfo.java new file mode 100644 index 0000000..89446b8 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/gateway/model/GatewayNodeInfo.java @@ -0,0 +1,38 @@ +package com.muyu.loadCenter.gateway.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ProjectName: load_center + * @PackageName: com.muyu.loadCenter.gateway.model + * @Description 网关节点信息 + * @Author HuangDaJu + * @Date 2024/4/18 11:28 + * @Version 1.0 + */ + + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class GatewayNodeInfo { + + /** + * 节点Id + */ + private String nodeId; + /** + * 公网ip + */ + private String publicIdAddress; + /** + * 内网ip + */ + private String privateIdAddress; + + +} diff --git a/src/main/java/com/muyu/loadCenter/service/GatewayLoadService.java b/src/main/java/com/muyu/loadCenter/service/GatewayLoadService.java new file mode 100644 index 0000000..a014832 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/service/GatewayLoadService.java @@ -0,0 +1,24 @@ +package com.muyu.loadCenter.service; + +/** + * @ProjectName: load_center + * @PackageName: com.muyu.loadCenter.service + * @Description TODO + * @Author HuangDaJu + * @Date 2024/4/18 14:16 + * @Version 1.0 + */ +public interface GatewayLoadService { + /** + * 负载节点 + * @return 返回负载节点 + */ + String loadNode(); + + + void scheduleECS(); + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/service/GatewayVehicleService.java b/src/main/java/com/muyu/loadCenter/service/GatewayVehicleService.java new file mode 100644 index 0000000..06d8fa5 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/service/GatewayVehicleService.java @@ -0,0 +1,26 @@ +package com.muyu.loadCenter.service; + +import com.muyu.loadCenter.domain.Result; + +/** + * @ProjectName: load_center + * @PackageName: com.muyu.loadCenter.service + * @Description TODO + * @Author HuangDaJu + * @Date 2024/4/18 18:39 + * @Version 1.0 + */ +public interface GatewayVehicleService { + + + Result topLine(String vin); + + + void loadDownLine(String vin); + + + + + + +} diff --git a/src/main/java/com/muyu/loadCenter/service/impl/GatewayLoadServiceImpl.java b/src/main/java/com/muyu/loadCenter/service/impl/GatewayLoadServiceImpl.java new file mode 100644 index 0000000..d958044 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/service/impl/GatewayLoadServiceImpl.java @@ -0,0 +1,285 @@ +package com.muyu.loadCenter.service.impl; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.muyu.loadCenter.aliyun.service.AliYunEcsService; +import com.muyu.loadCenter.domain.EcsInstanceInfo; +import com.muyu.loadCenter.domain.WorkGatewayNode; +import com.muyu.loadCenter.gateway.cache.*; +import com.muyu.loadCenter.gateway.model.GatewayNodeInfo; +import com.muyu.loadCenter.redis.service.RedisService; +import com.muyu.loadCenter.service.GatewayLoadService; +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @ProjectName: load_center + * @PackageName: com.muyu.loadCenter.service.impl + * @Description 网关负载业务 + * @Author HuangDaJu + * @Date 2024/4/18 14:16 + * @Version 1.0 + */ +@Service +@Log4j2 +@AllArgsConstructor +public class GatewayLoadServiceImpl implements GatewayLoadService { + + private final Long nodeLength = 100L; + /** + * 负载信息 + */ + private final GatewayLoadNodeCache gatewayLoadNodeCache; + + /** + * 负载序列 + * @return + */ + private final GatewayLoadSeriesCache gatewayLoadSeriesCache; + + /** + * 节点信息 + * @return + */ + private final GatewayNodeCache gatewayNodeCache; + + @Autowired + private RedisService redisService; + + + + private final GatewayZSetNodeCache gatewayZSetNodeCache; + + /** + * 阿里云ECS服务,用于管理云服务器 + */ + @Autowired + private AliYunEcsService aliYunEcsService; + + /** + * aa是计算服务器到达 扩容阈值的节点个数 符合条件+1 和节点个数个对比 相等就扩容 + */ + static int aa=0; + /** + * bb是计算服务器到达 缩容阈值的节点个数 符合条件+1 >=指定个数 相等就执行缩容 + */ + static int bb=0; + + + @Override + public String loadNode() { + + List loadNodeList = new ArrayList<>(); + ArrayList nodeIdList = new ArrayList<>(); + + + Map map = gatewayZSetNodeCache.get(); + + for (Map.Entry entry : map.entrySet()) { + WorkGatewayNode workGatewayNode1 = new WorkGatewayNode(); + log.info(entry.getKey().toString()+"--"+entry.getValue()); + workGatewayNode1.setNodeId(entry.getKey().toString()); + workGatewayNode1.setWeight(entry.getValue().intValue()); + nodeIdList.add(workGatewayNode1); + } + + long count = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum(); + if (count < 100) { + List list = nodeIdList.stream() + .sorted((o1, o2) -> o2.getWeight() - o1.getWeight()) + .toList(); + int countWeight = 0; + for (long i = count; i < 100; i++) { + WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); + workGatewayNode.setWeight(workGatewayNode.getWeight() + 1); + } + } + whFor: + while (true) { + for (WorkGatewayNode workGatewayNode : nodeIdList) { + int weight = workGatewayNode.getWeight(); + if (weight > 0) { + loadNodeList.add( + workGatewayNode.getNodeId() + ); + workGatewayNode.setWeight(weight - 1); + } + } + int sum = nodeIdList.stream() + .mapToInt(WorkGatewayNode::getWeight) + .sum(); + if (sum <= 0) { + break whFor; + } + } + + gatewayLoadNodeCache.put(loadNodeList); + + Long seriesLoad = gatewayLoadSeriesCache.incrementAndGet(); + System.out.println("seriesLoad:" + seriesLoad); + + Long seriesLoadIndex = seriesLoad % nodeLength; + + String loadNodeId = gatewayLoadNodeCache.getByIndex(seriesLoadIndex); + + GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId); + + return gatewayNodeInfo.getPublicIdAddress(); + + } + + @Override + public void scheduleECS() { + // 客户端初始化 + OkHttpClient client = new OkHttpClient(); + ArrayList ipCacheSet = new ArrayList<>(); + + // 从Redis获取服务器IP集合 + Map map = gatewayZSetNodeCache.get(); + + for (Map.Entry entry : map.entrySet()) { + ipCacheSet.add(entry.getKey().toString()); + } + + log.info("共有个"+ipCacheSet.size()+"服务器"); + + // 遍历每台服务器进行负载检查 + for (String ip : ipCacheSet) { + + // 构建请求URL和请求头 + String URL = "http://"+ip+":8080/public/cluster"; + Request request = new Request.Builder() + .url(URL) + .get() + .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)") + .addHeader("Accesstoken", "") + .build(); + + try { + Response response = client.newCall(request).execute(); + // 解析响应数据 + JSONArray jsonArray = JSONArray.parseArray(response.body().string()); + JSONObject jsonObject = jsonArray.getJSONObject(0); + JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo"); + int connectSize = mqttInfo.getIntValue("connectSize"); + + log.info("服务器:"+ip+"-车辆连接数:"+connectSize); + + // 更新Redis中服务器的连接数ZSet数据类型 + gatewayZSetNodeCache.put(ip,connectSize); + + // 根据连接数判断是否需要进行扩容或缩容 + if (connectSize >= 8) { + aa++; + // 当满足扩容条件时,记录日志并执行扩容操作 + if (aa == ipCacheSet.size()) { + log.info(aa+"------------------------------------------"+ipCacheSet.size()); + log.info("需要扩容"); + // 调用阿里云ECS服务创建新实例 + String instanceId = aliYunEcsService.runInstances(); + log.info("扩容的节点ip为:" + instanceId); + + // 休眠5秒以确保新实例创建完成 + Thread.sleep(5000); + + // 获取新实例信息并将其放入redis + EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId); + + GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo(); + gatewayNodeInfo.setNodeId(ecsInstanceInfo.getInstanceId()); + gatewayNodeInfo.setPublicIdAddress(ecsInstanceInfo.getPublicIpAddress()); + gatewayNodeInfo.setPrivateIdAddress(ecsInstanceInfo.getPrivateIpAddress()); + + + gatewayNodeCache.put(ecsInstanceInfo.getPublicIpAddress(), gatewayNodeInfo); + + + //这里模拟(也可以在别的类里完成) ECS创建成功后,服务器发送一条消息服务器正常启动,mq可以正常使用,存入redis + gatewayZSetNodeCache.put(ecsInstanceInfo.getPublicIpAddress(), 0); + log.info("实例id和公网ip存入redis"); + aa = 0; // 重置计数器 + } + }else { + aa = 0; + } + + + if (connectSize <= 2) { + bb++; + System.out.println("bb:"+bb); + + //缩容逻辑:删除连接数过低的服务器实例,出现连续两个节点连接数低于5的,则执行缩容操作 + if (bb >= 2){ + log.info("释放实例"+ip); + + gatewayZSetNodeCache.remove(ip); + + GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(ip); + + //这里再来个异步操作 将 GatewayNodeInfo 对象传走,进行异步操作 + +// 1.分批次解除连接的车辆 +// 2.删除各个相关的redis缓存 + +// aliYunEcsService.releaseECS(gatewayNodeInfo.getNodeId()); // 释放ECS实例 + + bb = 0; + + } + + }else{ + bb = 0; + } + + + + + + + + + + + + + + + + + + + + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + +} + +// else if (connectSize <= 2) { + +//删除ECS里面的ip,车辆再次上线,找不到这个要缩容的服务器,让找不到 +// redisService.deleteCacheZset("ECS" ,ip); + +// 缩容逻辑:删除连接数过低的服务器实例 +// String url = "http://127.0.0.1:9006/ecsInstance/select/" + ip; +// Result result = restTemplate.postForObject(url, null, Result.class); +// String instanceId = (String) result.getData(); + + + +// Long i = redisService.deleteCacheZset("ECS", ip);// 从Redis中删除该服务器的记录 + +// } diff --git a/src/main/java/com/muyu/loadCenter/service/impl/GatewayVehicleServiceImpl.java b/src/main/java/com/muyu/loadCenter/service/impl/GatewayVehicleServiceImpl.java new file mode 100644 index 0000000..848d068 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/service/impl/GatewayVehicleServiceImpl.java @@ -0,0 +1,82 @@ +package com.muyu.loadCenter.service.impl; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.muyu.loadCenter.domain.Result; +import com.muyu.loadCenter.gateway.cache.GatewayNodeSetVinCache; +import com.muyu.loadCenter.gateway.cache.GatewayVehicleNode; +import com.muyu.loadCenter.gateway.cache.GatewayZSetNodeCache; +import com.muyu.loadCenter.service.GatewayVehicleService; +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + +import java.util.ArrayList; +import java.util.Map; + +/** + * @ProjectName: load_center + * @PackageName: com.muyu.loadCenter.service.impl + * @Description TODO + * @Author HuangDaJu + * @Date 2024/4/18 18:39 + * @Version 1.0 + */ +@Service +@AllArgsConstructor +@Log4j2 +public class GatewayVehicleServiceImpl implements GatewayVehicleService { + + + + private final GatewayNodeSetVinCache gatewayNodeSetVinCache; + + + private final GatewayZSetNodeCache gatewayZSetNodeCache; + + + private final GatewayVehicleNode gatewayVehicleNode; + + @Autowired + private RestTemplate restTemplate; + + + + /** + * 车辆上线 + * @param vin + * @return nodeId + */ + @Override + public Result topLine(String vin) { + + String url="http://127.0.0.1:9010/gateway/load/node"; + + Result data = restTemplate.getForObject(url, Result.class); + + gatewayVehicleNode.put(vin,data.getData().toString()); + + gatewayNodeSetVinCache.put(data.getData().toString(),vin); + + return Result.success(data.getData().toString()); + } + + @Override + public void loadDownLine(String vin) { + + String nodeId = gatewayVehicleNode.get(vin); + + gatewayNodeSetVinCache.remove(nodeId,vin); + + gatewayVehicleNode.remove(vin); + } + + + + +} diff --git a/src/test/java/LoadTest.java b/src/test/java/LoadTest.java new file mode 100644 index 0000000..f817001 --- /dev/null +++ b/src/test/java/LoadTest.java @@ -0,0 +1,30 @@ + +import com.muyu.loadCenter.LoadCenterApplication; +import com.muyu.loadCenter.domain.WorkGatewayNode; +import com.muyu.loadCenter.gateway.model.GatewayNodeInfo; +import com.muyu.loadCenter.redis.service.RedisService; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.ArrayList; +import java.util.List; +/** + * @author DongZl + * @description: 负载测试 + * @Date 2024/4/12 下午5:13 + */ +@SpringBootTest(classes = LoadCenterApplication.class) +public class LoadTest { + + @Autowired + private RedisService redisService; + + @Test + public void load() { + GatewayNodeInfo gatewayNodeInfo = redisService.getCacheObject("gateway:node:info:39.101.193.188"); + System.out.println(gatewayNodeInfo); + + + } +}