4.18
parent
4c0155d1cc
commit
0cd280e8f1
2
pom.xml
2
pom.xml
|
@ -5,7 +5,7 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>LoadCenter</artifactId>
|
||||
<artifactId>load_center</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<properties>
|
||||
<maven.compiler.source>20</maven.compiler.source>
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package com.muyu.loadCenter.controller;
|
||||
|
||||
import com.muyu.loadCenter.redis.service.RedisService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* 处理车辆下线业务
|
||||
* @Author HuangDaJu
|
||||
* @Date 2024/4/17 21:28
|
||||
* @Version 1.0
|
||||
*/
|
||||
@Log4j2
|
||||
@RestController
|
||||
@RequestMapping("downLine")
|
||||
public class DownLineController {
|
||||
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
|
||||
@PostMapping("/carDownLine/{vin}")
|
||||
public void getVinDownLine(@PathVariable String vin) throws Exception {
|
||||
|
||||
|
||||
String nodeId = redisService.getCacheObject("delete:" + vin);
|
||||
|
||||
redisService.deleteObject("delete:"+vin);
|
||||
|
||||
redisService.deleteCacheSet("release:"+nodeId,vin);
|
||||
|
||||
log.info("车辆下线成功");
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,10 +1,12 @@
|
|||
package com.muyu.loadCenter.controller;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.loadCenter.aliyun.service.AliYunEcsService;
|
||||
import com.muyu.loadCenter.domain.EcsInstanceInfo;
|
||||
import com.muyu.loadCenter.domain.Result;
|
||||
import com.muyu.loadCenter.service.ReleaseEcsDownLine;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
|
@ -31,15 +33,19 @@ public class LoadCenterController {
|
|||
int aa=0;
|
||||
// 临时变量,无特定用途(根据现有代码)
|
||||
int bb=0;
|
||||
@Autowired
|
||||
StringRedisTemplate redisTemplate; // Redis字符串模板,用于与Redis进行交互
|
||||
// Redis字符串模板,用于与Redis进行交互
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService; // Redis服务,封装了与Redis操作相关的功能
|
||||
|
||||
@Autowired
|
||||
private AliYunEcsService aliYunEcsService; // 阿里云ECS服务,用于管理云服务器
|
||||
|
||||
@Autowired
|
||||
private RestTemplate restTemplate; // 用于与其他服务进行HTTP交互的模板
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
@Autowired
|
||||
private ReleaseEcsDownLine releaseEcsDownLine;// 用于与其他服务进行HTTP交互的模板
|
||||
|
||||
/**
|
||||
* 定时任务,每30秒扫描一次服务器集群的负载情况。
|
||||
|
@ -57,6 +63,7 @@ public class LoadCenterController {
|
|||
|
||||
// 遍历每台服务器进行负载检查
|
||||
for (String ip : ipCacheSet) {
|
||||
|
||||
// 构建请求URL和请求头
|
||||
String URL = "http://"+ip+":8080/public/cluster";
|
||||
Request request = new Request.Builder()
|
||||
|
@ -76,11 +83,11 @@ public class LoadCenterController {
|
|||
|
||||
log.info("服务器:"+ip+"-车辆连接数:"+connectSize);
|
||||
|
||||
// 更新Redis中服务器的连接数
|
||||
// 更新Redis中服务器的连接数ZSet数据类型
|
||||
redisService.setCacheZSet("ECS", ip, connectSize);
|
||||
|
||||
// 根据连接数判断是否需要进行扩容或缩容
|
||||
if (connectSize >= 5) {
|
||||
if (connectSize >= 6) {
|
||||
aa++;
|
||||
// 当满足扩容条件时,记录日志并执行扩容操作
|
||||
if (aa == ipCacheSet.size()) {
|
||||
|
@ -94,26 +101,43 @@ public class LoadCenterController {
|
|||
|
||||
// 获取新实例信息并将其持久化到本地数据库
|
||||
EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId);
|
||||
String url = "http://127.0.0.1:9006/ecsInstance/add";
|
||||
restTemplate.postForObject(url, ecsInstanceInfo, Result.class);
|
||||
|
||||
|
||||
//String数据类型:创建好并查询的对象转换为JSON字符串,信息存入redis第一个数据类型 先存入redis 确保正常运行了在假如Zset表
|
||||
redisService.setCacheObject("ecsInstance:"+ecsInstanceInfo.getPublicIpAddress(), ecsInstanceInfo.getInstanceId());
|
||||
|
||||
|
||||
|
||||
//这里模拟(也可以在别的类里完成) ECS创建成功后,服务器发送一条消息服务器正常启动,mq可以正常使用,存入redis
|
||||
redisService.setCacheZSet("ECS", ecsInstanceInfo.getPublicIpAddress(), 0);
|
||||
|
||||
// String url = "http://127.0.0.1:9006/ecsInstance/add";
|
||||
// restTemplate.postForObject(url, ecsInstanceInfo, Result.class);
|
||||
log.info("实例信息持久化本地");
|
||||
|
||||
// 将新实例的IP和ID存入Redis
|
||||
redisService.setCacheZSet("ECS", ecsInstanceInfo.getPublicIpAddress(), 0);
|
||||
log.info("实例id和公网ip存入redis");
|
||||
aa = 0; // 重置计数器
|
||||
}
|
||||
} else if (connectSize <= 2) {
|
||||
// 缩容逻辑:删除连接数过低的服务器实例
|
||||
}
|
||||
else if (connectSize <= 2) {
|
||||
|
||||
//删除ECS里面的ip,车辆再次上线,找不到这个要缩容的服务器,让找不到
|
||||
redisService.deleteCacheZset("ECS" ,ip);
|
||||
|
||||
// 缩容逻辑:删除连接数过低的服务器实例
|
||||
// String url = "http://127.0.0.1:9006/ecsInstance/select/" + ip;
|
||||
// Result result = restTemplate.postForObject(url, null, Result.class);
|
||||
// String instanceId = (String) result.getData();
|
||||
|
||||
releaseEcsDownLine.releaseEcsDownLine(ip);
|
||||
|
||||
// aliYunEcsService.releaseECS(instanceId); // 释放ECS实例
|
||||
|
||||
Long i = redisService.deleteCacheZset("ECS", ip);// 从Redis中删除该服务器的记录
|
||||
// Long i = redisService.deleteCacheZset("ECS", ip);// 从Redis中删除该服务器的记录
|
||||
|
||||
|
||||
log.info(i+"连接数小于2,服务器缩容:" + ip);
|
||||
log.info("连接数小于2,服务器缩容:" + ip);
|
||||
aa = 0; // 重置计数器
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -1,45 +1,35 @@
|
|||
package com.muyu.loadCenter.controller;
|
||||
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.loadCenter.LoadCenterApplication;
|
||||
import com.muyu.loadCenter.domain.Result;
|
||||
import com.muyu.loadCenter.domain.WorkGatewayNode;
|
||||
import com.muyu.loadCenter.redis.service.RedisService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.*;
|
||||
/**
|
||||
* 获取工作网关节点信息并进行负载均衡处理
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@RestController
|
||||
@RequestMapping("carGoGoGo")
|
||||
@RequestMapping("carGoGo")
|
||||
public class WorkGatewayNodeController {
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
@PostMapping("/workGatewayNode")
|
||||
|
||||
public Result getWorkGatewayNode() throws Exception {
|
||||
|
||||
// redisService.setCacheObject("cursor", 0);
|
||||
@PostMapping("/workGatewayNode/{vin}")
|
||||
public Result<String> getWorkGatewayNode(@PathVariable String vin) throws Exception {
|
||||
// int size = vinList.size();
|
||||
|
||||
redisService.setCacheObject("cursor", 0);
|
||||
// 从Redis获取ECS的集合,并转换为工作网关节点列表
|
||||
|
||||
ArrayList<WorkGatewayNode> nodeIdList = new ArrayList<>();
|
||||
|
||||
|
||||
|
||||
Map<Object, Double> map = redisService.getCacheZSetScore("ECS");
|
||||
|
||||
for (Map.Entry<Object, Double> entry : map.entrySet()) {
|
||||
WorkGatewayNode workGatewayNode1 = new WorkGatewayNode();
|
||||
log.info(entry.getKey().toString()+"--"+entry.getValue());
|
||||
|
@ -48,24 +38,14 @@ public class WorkGatewayNodeController {
|
|||
nodeIdList.add(workGatewayNode1);
|
||||
}
|
||||
|
||||
log.info("----------------------------分割线----↓--------------------------");
|
||||
System.out.println(nodeIdList);
|
||||
log.info("----------------------------分割线----↑---------------------------");
|
||||
|
||||
// ArrayList<WorkGatewayNode> nodeIdList = carWorkGatewayNode();
|
||||
|
||||
|
||||
|
||||
List<String> loadNodeList = new ArrayList<>();
|
||||
|
||||
// 对节点列表进行处理,以实现负载均衡
|
||||
long count = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum();
|
||||
|
||||
|
||||
if (count < 100) {
|
||||
// 如果总权重小于100,则对节点进行加权处理,确保总权重为100
|
||||
List<WorkGatewayNode> list = nodeIdList.stream()
|
||||
.sorted((o1, o2) -> o2.getWeight() - o1.getWeight())
|
||||
.sorted((o2, o1) -> o1.getWeight() - o2.getWeight())
|
||||
.toList();
|
||||
|
||||
int countWeight = 0;
|
||||
for (long i = count; i < 100; i++) {
|
||||
WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size());
|
||||
|
@ -73,7 +53,8 @@ public class WorkGatewayNodeController {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// 进行负载均衡分配,直到所有权重都分配完毕
|
||||
List<String> loadNodeList = new ArrayList<>();
|
||||
while (nodeIdList.stream().anyMatch(node -> node.getWeight() > 0)) {
|
||||
for (WorkGatewayNode workGatewayNode : nodeIdList) {
|
||||
int weight = workGatewayNode.getWeight();
|
||||
|
@ -86,18 +67,27 @@ public class WorkGatewayNodeController {
|
|||
|
||||
|
||||
|
||||
log.info("----------------------------分割线--------------------------");
|
||||
redisService.deleteObject("work:node:gateway");
|
||||
redisService.setCacheList("work:node:gateway",loadNodeList);
|
||||
|
||||
// redisService.setCacheList("work:node:gateway", loadNodeList);
|
||||
|
||||
ArrayList<String> arrayList = new ArrayList<>();
|
||||
|
||||
|
||||
// for (int i = 0; i < size; i++) {
|
||||
// Long cursor = redisService.increment("cursor", 1L);
|
||||
// String nodeId = redisService.getCacheListValue("work:node:gateway", cursor % 100);
|
||||
// System.out.println("nodeId:"+nodeId);
|
||||
// arrayList.add(nodeId);
|
||||
// }
|
||||
// return Result.success(arrayList);
|
||||
|
||||
// 统计每个节点出现的次数,找出出现次数最少的节点
|
||||
Map<Object, Integer> countMap = new HashMap<>();
|
||||
|
||||
// 统计每个对象出现的次数
|
||||
for (Object obj : loadNodeList) {
|
||||
countMap.put(obj, countMap.getOrDefault(obj, 0) + 1);
|
||||
}
|
||||
|
||||
// 找出出现次数最少的对象
|
||||
Object nodeId = null;
|
||||
int minCount = Integer.MAX_VALUE;
|
||||
for (Map.Entry<Object, Integer> entry : countMap.entrySet()) {
|
||||
|
@ -106,60 +96,23 @@ public class WorkGatewayNodeController {
|
|||
nodeId = entry.getKey();
|
||||
}
|
||||
}
|
||||
// 如果有最小数量的对象,则返回最小数量的对象;否则随机选择一个对象
|
||||
if (nodeId != null) {
|
||||
return Result.success(nodeId);
|
||||
} else {
|
||||
Random random = new Random();
|
||||
return Result.success(random.nextInt(loadNodeList.size()));
|
||||
}
|
||||
|
||||
|
||||
redisService.setCacheSet("release:"+nodeId,vin);
|
||||
|
||||
redisService.setCacheObject("delete:"+vin,nodeId);
|
||||
|
||||
|
||||
// 返回最少出现次数的节点或随机选择一个节点
|
||||
|
||||
return Result.success(nodeId.toString());
|
||||
|
||||
|
||||
}
|
||||
|
||||
//这里是查询连接数的,现在使用redis的zset方法获得,就不需要下面这些了
|
||||
|
||||
// public ArrayList<WorkGatewayNode> carWorkGatewayNode() throws Exception {
|
||||
//
|
||||
// Set<String> ipCacheSet = redisService.getCacheZSet("ECS");
|
||||
// ArrayList<WorkGatewayNode> nodeIdList = new ArrayList<>();
|
||||
//
|
||||
// OkHttpClient client = new OkHttpClient();
|
||||
// for (String ip : ipCacheSet) {
|
||||
// System.out.println("ip:"+ip);
|
||||
// String URL = "http://"+ip+":8080/public/cluster";
|
||||
// Request request = new Request.Builder()
|
||||
// .url(URL)
|
||||
// .get()
|
||||
// .addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)")
|
||||
// .addHeader("Accesstoken", "")
|
||||
// .build();
|
||||
//
|
||||
// try {
|
||||
// Response response = client.newCall(request).execute();
|
||||
// JSONArray jsonArray = JSONArray.parseArray(response.body().string());
|
||||
// JSONObject jsonObject = jsonArray.getJSONObject(0);
|
||||
// JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo");
|
||||
// int connectSize = mqttInfo.getIntValue("connectSize");
|
||||
// WorkGatewayNode workGatewayNode = new WorkGatewayNode();
|
||||
// log.info("服务器:"+ip+"-车辆连接数:"+connectSize);
|
||||
//
|
||||
// workGatewayNode.setWeight(connectSize);
|
||||
// workGatewayNode.setNodeId(ip);
|
||||
//
|
||||
// nodeIdList.add(workGatewayNode);
|
||||
//
|
||||
//
|
||||
// } catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
//
|
||||
//
|
||||
// }
|
||||
// return nodeIdList;
|
||||
//
|
||||
// }
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -202,6 +202,7 @@ public class RedisService
|
|||
zSetOperations.rangeWithScores(key, 0, -1).forEach(tuple -> {
|
||||
memberScores.put(tuple.getValue(), tuple.getScore());
|
||||
});
|
||||
|
||||
return memberScores;
|
||||
}
|
||||
|
||||
|
@ -255,9 +256,9 @@ public class RedisService
|
|||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public Long deleteCacheZset(final String key, String value){
|
||||
public void deleteCacheZset(final String key, final String value){
|
||||
|
||||
return redisTemplate.opsForZSet().remove(key,value);
|
||||
redisTemplate.opsForZSet().remove(key,value);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
package com.muyu.loadCenter.service;
|
||||
|
||||
import com.muyu.loadCenter.aliyun.service.AliYunEcsService;
|
||||
import com.muyu.loadCenter.redis.service.RedisService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 实例释放前-车辆下线
|
||||
* @Author HuangDaJu
|
||||
* @Date 2024/4/17 21:51
|
||||
* @Version 1.0
|
||||
*/
|
||||
@Component
|
||||
public class ReleaseEcsDownLine {
|
||||
|
||||
@Autowired
|
||||
private AliYunEcsService aliYunEcsService; // 阿里云ECS服务,用于管理云服务器
|
||||
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
/**
|
||||
* 释放ECS下线车辆
|
||||
*/
|
||||
public void releaseEcsDownLine(String ip) throws Exception {
|
||||
|
||||
|
||||
//这里可以主动下线
|
||||
|
||||
String ecsInstanceId = redisService.getCacheObject("ecsInstance:" + ip);
|
||||
|
||||
|
||||
// 获取redis中的车辆VIN
|
||||
Set<String> vinSet = redisService.getCacheSet("release" + ip);
|
||||
|
||||
|
||||
//这里来执行解绑车辆 异步
|
||||
|
||||
|
||||
|
||||
aliYunEcsService.releaseECS(ecsInstanceId);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue