diff --git a/pom.xml b/pom.xml index 750bc4d..b94de24 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.muyu - LoadCenter + load_center 1.0-SNAPSHOT 20 diff --git a/src/main/java/com/muyu/loadCenter/controller/DownLineController.java b/src/main/java/com/muyu/loadCenter/controller/DownLineController.java new file mode 100644 index 0000000..0dc77c7 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/controller/DownLineController.java @@ -0,0 +1,41 @@ +package com.muyu.loadCenter.controller; + +import com.muyu.loadCenter.redis.service.RedisService; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 处理车辆下线业务 + * @Author HuangDaJu + * @Date 2024/4/17 21:28 + * @Version 1.0 + */ +@Log4j2 +@RestController +@RequestMapping("downLine") +public class DownLineController { + + + @Autowired + private RedisService redisService; + + @PostMapping("/carDownLine/{vin}") + public void getVinDownLine(@PathVariable String vin) throws Exception { + + + String nodeId = redisService.getCacheObject("delete:" + vin); + + redisService.deleteObject("delete:"+vin); + + redisService.deleteCacheSet("release:"+nodeId,vin); + + log.info("车辆下线成功"); + } + + + +} diff --git a/src/main/java/com/muyu/loadCenter/controller/LoadCenterController.java b/src/main/java/com/muyu/loadCenter/controller/LoadCenterController.java index eecdfb1..9b5a09f 100644 --- a/src/main/java/com/muyu/loadCenter/controller/LoadCenterController.java +++ b/src/main/java/com/muyu/loadCenter/controller/LoadCenterController.java @@ -1,10 +1,12 @@ package com.muyu.loadCenter.controller; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.muyu.loadCenter.aliyun.service.AliYunEcsService; import com.muyu.loadCenter.domain.EcsInstanceInfo; import com.muyu.loadCenter.domain.Result; +import com.muyu.loadCenter.service.ReleaseEcsDownLine; import lombok.extern.log4j.Log4j2; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -31,15 +33,19 @@ public class LoadCenterController { int aa=0; // 临时变量,无特定用途(根据现有代码) int bb=0; - @Autowired - StringRedisTemplate redisTemplate; // Redis字符串模板,用于与Redis进行交互 +// Redis字符串模板,用于与Redis进行交互 @Autowired private RedisService redisService; // Redis服务,封装了与Redis操作相关的功能 + @Autowired private AliYunEcsService aliYunEcsService; // 阿里云ECS服务,用于管理云服务器 + @Autowired - private RestTemplate restTemplate; // 用于与其他服务进行HTTP交互的模板 + private RestTemplate restTemplate; + + @Autowired + private ReleaseEcsDownLine releaseEcsDownLine;// 用于与其他服务进行HTTP交互的模板 /** * 定时任务,每30秒扫描一次服务器集群的负载情况。 @@ -57,6 +63,7 @@ public class LoadCenterController { // 遍历每台服务器进行负载检查 for (String ip : ipCacheSet) { + // 构建请求URL和请求头 String URL = "http://"+ip+":8080/public/cluster"; Request request = new Request.Builder() @@ -76,11 +83,11 @@ public class LoadCenterController { log.info("服务器:"+ip+"-车辆连接数:"+connectSize); - // 更新Redis中服务器的连接数 + // 更新Redis中服务器的连接数ZSet数据类型 redisService.setCacheZSet("ECS", ip, connectSize); // 根据连接数判断是否需要进行扩容或缩容 - if (connectSize >= 5) { + if (connectSize >= 6) { aa++; // 当满足扩容条件时,记录日志并执行扩容操作 if (aa == ipCacheSet.size()) { @@ -94,26 +101,43 @@ public class LoadCenterController { // 获取新实例信息并将其持久化到本地数据库 EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId); - String url = "http://127.0.0.1:9006/ecsInstance/add"; - restTemplate.postForObject(url, ecsInstanceInfo, Result.class); + + + //String数据类型:创建好并查询的对象转换为JSON字符串,信息存入redis第一个数据类型 先存入redis 确保正常运行了在假如Zset表 + redisService.setCacheObject("ecsInstance:"+ecsInstanceInfo.getPublicIpAddress(), ecsInstanceInfo.getInstanceId()); + + + + //这里模拟(也可以在别的类里完成) ECS创建成功后,服务器发送一条消息服务器正常启动,mq可以正常使用,存入redis + redisService.setCacheZSet("ECS", ecsInstanceInfo.getPublicIpAddress(), 0); + +// String url = "http://127.0.0.1:9006/ecsInstance/add"; +// restTemplate.postForObject(url, ecsInstanceInfo, Result.class); log.info("实例信息持久化本地"); // 将新实例的IP和ID存入Redis - redisService.setCacheZSet("ECS", ecsInstanceInfo.getPublicIpAddress(), 0); log.info("实例id和公网ip存入redis"); aa = 0; // 重置计数器 } - } else if (connectSize <= 2) { - // 缩容逻辑:删除连接数过低的服务器实例 + } + else if (connectSize <= 2) { + + //删除ECS里面的ip,车辆再次上线,找不到这个要缩容的服务器,让找不到 + redisService.deleteCacheZset("ECS" ,ip); + +// 缩容逻辑:删除连接数过低的服务器实例 // String url = "http://127.0.0.1:9006/ecsInstance/select/" + ip; // Result result = restTemplate.postForObject(url, null, Result.class); // String instanceId = (String) result.getData(); + + releaseEcsDownLine.releaseEcsDownLine(ip); + // aliYunEcsService.releaseECS(instanceId); // 释放ECS实例 - Long i = redisService.deleteCacheZset("ECS", ip);// 从Redis中删除该服务器的记录 +// Long i = redisService.deleteCacheZset("ECS", ip);// 从Redis中删除该服务器的记录 - log.info(i+"连接数小于2,服务器缩容:" + ip); + log.info("连接数小于2,服务器缩容:" + ip); aa = 0; // 重置计数器 } } catch (Exception e) { diff --git a/src/main/java/com/muyu/loadCenter/controller/WorkGatewayNodeController.java b/src/main/java/com/muyu/loadCenter/controller/WorkGatewayNodeController.java index 9fe5867..35bbe1d 100644 --- a/src/main/java/com/muyu/loadCenter/controller/WorkGatewayNodeController.java +++ b/src/main/java/com/muyu/loadCenter/controller/WorkGatewayNodeController.java @@ -1,45 +1,35 @@ package com.muyu.loadCenter.controller; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.muyu.loadCenter.LoadCenterApplication; import com.muyu.loadCenter.domain.Result; import com.muyu.loadCenter.domain.WorkGatewayNode; import com.muyu.loadCenter.redis.service.RedisService; import lombok.extern.log4j.Log4j2; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.*; +/** + * 获取工作网关节点信息并进行负载均衡处理 + */ @Log4j2 @RestController -@RequestMapping("carGoGoGo") +@RequestMapping("carGoGo") public class WorkGatewayNodeController { @Autowired private RedisService redisService; - @PostMapping("/workGatewayNode") - public Result getWorkGatewayNode() throws Exception { -// redisService.setCacheObject("cursor", 0); + @PostMapping("/workGatewayNode/{vin}") + public Result getWorkGatewayNode(@PathVariable String vin) throws Exception { +// int size = vinList.size(); + redisService.setCacheObject("cursor", 0); + // 从Redis获取ECS的集合,并转换为工作网关节点列表 ArrayList nodeIdList = new ArrayList<>(); - - - Map map = redisService.getCacheZSetScore("ECS"); - for (Map.Entry entry : map.entrySet()) { WorkGatewayNode workGatewayNode1 = new WorkGatewayNode(); log.info(entry.getKey().toString()+"--"+entry.getValue()); @@ -48,24 +38,14 @@ public class WorkGatewayNodeController { nodeIdList.add(workGatewayNode1); } - log.info("----------------------------分割线----↓--------------------------"); - System.out.println(nodeIdList); - log.info("----------------------------分割线----↑---------------------------"); - -// ArrayList nodeIdList = carWorkGatewayNode(); - - - - List loadNodeList = new ArrayList<>(); - + // 对节点列表进行处理,以实现负载均衡 long count = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum(); - if (count < 100) { + // 如果总权重小于100,则对节点进行加权处理,确保总权重为100 List list = nodeIdList.stream() - .sorted((o1, o2) -> o2.getWeight() - o1.getWeight()) + .sorted((o2, o1) -> o1.getWeight() - o2.getWeight()) .toList(); - int countWeight = 0; for (long i = count; i < 100; i++) { WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); @@ -73,7 +53,8 @@ public class WorkGatewayNodeController { } } - + // 进行负载均衡分配,直到所有权重都分配完毕 + List loadNodeList = new ArrayList<>(); while (nodeIdList.stream().anyMatch(node -> node.getWeight() > 0)) { for (WorkGatewayNode workGatewayNode : nodeIdList) { int weight = workGatewayNode.getWeight(); @@ -86,18 +67,27 @@ public class WorkGatewayNodeController { - log.info("----------------------------分割线--------------------------"); + redisService.deleteObject("work:node:gateway"); + redisService.setCacheList("work:node:gateway",loadNodeList); -// redisService.setCacheList("work:node:gateway", loadNodeList); + ArrayList arrayList = new ArrayList<>(); + + +// for (int i = 0; i < size; i++) { +// Long cursor = redisService.increment("cursor", 1L); +// String nodeId = redisService.getCacheListValue("work:node:gateway", cursor % 100); +// System.out.println("nodeId:"+nodeId); +// arrayList.add(nodeId); +// } +// return Result.success(arrayList); + + // 统计每个节点出现的次数,找出出现次数最少的节点 Map countMap = new HashMap<>(); - - // 统计每个对象出现的次数 for (Object obj : loadNodeList) { countMap.put(obj, countMap.getOrDefault(obj, 0) + 1); } - // 找出出现次数最少的对象 Object nodeId = null; int minCount = Integer.MAX_VALUE; for (Map.Entry entry : countMap.entrySet()) { @@ -106,60 +96,23 @@ public class WorkGatewayNodeController { nodeId = entry.getKey(); } } - // 如果有最小数量的对象,则返回最小数量的对象;否则随机选择一个对象 - if (nodeId != null) { - return Result.success(nodeId); - } else { - Random random = new Random(); - return Result.success(random.nextInt(loadNodeList.size())); - } + redisService.setCacheSet("release:"+nodeId,vin); + + redisService.setCacheObject("delete:"+vin,nodeId); + + + // 返回最少出现次数的节点或随机选择一个节点 + + return Result.success(nodeId.toString()); } -//这里是查询连接数的,现在使用redis的zset方法获得,就不需要下面这些了 -// public ArrayList carWorkGatewayNode() throws Exception { -// -// Set ipCacheSet = redisService.getCacheZSet("ECS"); -// ArrayList nodeIdList = new ArrayList<>(); -// -// OkHttpClient client = new OkHttpClient(); -// for (String ip : ipCacheSet) { -// System.out.println("ip:"+ip); -// String URL = "http://"+ip+":8080/public/cluster"; -// Request request = new Request.Builder() -// .url(URL) -// .get() -// .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)") -// .addHeader("Accesstoken", "") -// .build(); -// -// try { -// Response response = client.newCall(request).execute(); -// JSONArray jsonArray = JSONArray.parseArray(response.body().string()); -// JSONObject jsonObject = jsonArray.getJSONObject(0); -// JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo"); -// int connectSize = mqttInfo.getIntValue("connectSize"); -// WorkGatewayNode workGatewayNode = new WorkGatewayNode(); -// log.info("服务器:"+ip+"-车辆连接数:"+connectSize); -// -// workGatewayNode.setWeight(connectSize); -// workGatewayNode.setNodeId(ip); -// -// nodeIdList.add(workGatewayNode); -// -// -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// -// } -// return nodeIdList; -// -// } + + + } diff --git a/src/main/java/com/muyu/loadCenter/redis/service/RedisService.java b/src/main/java/com/muyu/loadCenter/redis/service/RedisService.java index 93ed569..9e366be 100644 --- a/src/main/java/com/muyu/loadCenter/redis/service/RedisService.java +++ b/src/main/java/com/muyu/loadCenter/redis/service/RedisService.java @@ -202,6 +202,7 @@ public class RedisService zSetOperations.rangeWithScores(key, 0, -1).forEach(tuple -> { memberScores.put(tuple.getValue(), tuple.getScore()); }); + return memberScores; } @@ -255,9 +256,9 @@ public class RedisService * @param key * @return */ - public Long deleteCacheZset(final String key, String value){ + public void deleteCacheZset(final String key, final String value){ - return redisTemplate.opsForZSet().remove(key,value); + redisTemplate.opsForZSet().remove(key,value); } /** diff --git a/src/main/java/com/muyu/loadCenter/service/ReleaseEcsDownLine.java b/src/main/java/com/muyu/loadCenter/service/ReleaseEcsDownLine.java new file mode 100644 index 0000000..563c4d3 --- /dev/null +++ b/src/main/java/com/muyu/loadCenter/service/ReleaseEcsDownLine.java @@ -0,0 +1,51 @@ +package com.muyu.loadCenter.service; + +import com.muyu.loadCenter.aliyun.service.AliYunEcsService; +import com.muyu.loadCenter.redis.service.RedisService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Set; + +/** + * 实例释放前-车辆下线 + * @Author HuangDaJu + * @Date 2024/4/17 21:51 + * @Version 1.0 + */ +@Component +public class ReleaseEcsDownLine { + + @Autowired + private AliYunEcsService aliYunEcsService; // 阿里云ECS服务,用于管理云服务器 + + @Autowired + private RedisService redisService; + /** + * 释放ECS下线车辆 + */ + public void releaseEcsDownLine(String ip) throws Exception { + + + //这里可以主动下线 + + String ecsInstanceId = redisService.getCacheObject("ecsInstance:" + ip); + + + // 获取redis中的车辆VIN + Set vinSet = redisService.getCacheSet("release" + ip); + + + //这里来执行解绑车辆 异步 + + + + aliYunEcsService.releaseECS(ecsInstanceId); + + } + + + + + +}