master
肖凡 2024-04-21 09:04:11 +08:00
parent 1f4a28ea11
commit a5dbdf2457
8 changed files with 302 additions and 109 deletions

View File

@ -57,6 +57,11 @@
</distributionManagement>
<dependencies>
<!--MQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- SpringBoot Web-->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -0,0 +1,86 @@
package com.xiaofan.loadcenter.common.rabbit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
public static final Logger logger= LoggerFactory.getLogger(RabbitMqConfig.class);
private RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
public static final String QUEUE="queue";
public static final String EXCHANGE="exchange";
public static final String KEY="Key";
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate=rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
rabbitTemplate();
return rabbitTemplate;
}
public void rabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Bean
public Queue queue(){
return new Queue(QUEUE,true);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with(KEY);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack){
logger.info("{}消息到达交换机",correlationData.getId());
}else{
logger.error("{}消息丢失",correlationData.getId());
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
logger.error("{}消息未到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
}
}

View File

@ -28,8 +28,30 @@ public class GatewayController {
}
/**
*
* @throws Exception
*/
@Scheduled(cron = "0/5 * * * * ?")
public void dilatation() throws Exception{
gatewayLoadService.dilatation();
public void selectecs() throws Exception{
gatewayLoadService.selectecs();
}
/**
*
* @throws Exception
*/
@Scheduled(cron = "0/30 * * * * ?")
public void companding() throws Exception{
gatewayLoadService.companding();
}
/**
*
* @throws Exception
*/
@Scheduled(cron = "0/30 * * * * ?")
public void refreshLoad() throws Exception{
gatewayLoadService.refreshLoad();
}
}

View File

@ -4,6 +4,7 @@ import com.xiaofan.loadcenter.gateway.abs.GatewayCacheAbs;
import com.xiaofan.loadcenter.gateway.model.GatewayNodeInfo;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
/**
@ -24,6 +25,7 @@ public class GatewayNodeCache extends GatewayCacheAbs<String> {
return "gateway:node:";
}
/**
*
* @param gatewayNodeInfo

View File

@ -1,8 +1,10 @@
package com.xiaofan.loadcenter.gateway.cache;
import com.xiaofan.loadcenter.common.domain.WorkGatewayNode;
import com.xiaofan.loadcenter.gateway.abs.GatewayCacheAbs;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
@ -26,10 +28,22 @@ public class GatewayNodeScoreCache extends GatewayCacheAbs<String> {
//从缓存中获取节点与权重的映射关系
public Map<Object,Double> get(){
public Map<Object,Double> get1(){
return redisService.getCacheZSetScore(encode(gatewayzSetCount));
}
//从Redis获取负载节点集合
public ArrayList<WorkGatewayNode> get(){
ArrayList<WorkGatewayNode> nodeIdList = new ArrayList<>();
Map<Object, Double> map = redisService.getCacheZSetScore(encode(gatewayzSetCount));
for (Map.Entry<Object, Double> entry : map.entrySet()) {
WorkGatewayNode workGatewayNode = new WorkGatewayNode();
workGatewayNode.setNodeId(entry.getKey().toString());
workGatewayNode.setWeight(entry.getValue().intValue());
nodeIdList.add(workGatewayNode);
}
return nodeIdList;
}
/**
@ -39,6 +53,23 @@ public class GatewayNodeScoreCache extends GatewayCacheAbs<String> {
redisService.setCacheZSet(encode(gatewayzSetCount),nodeId,connectSize);
}
/**
* ,
*/
public Long getGatewayNodeNum(){
ArrayList<WorkGatewayNode> nodeIdList=get();
Long gatewayNodeNum=(long) nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum();
return gatewayNodeNum;
}
/**
*
*/
public Long getNodeMaxOnlineNum(){
ArrayList<WorkGatewayNode> nodeIdList=get();
return nodeIdList.size()*80L;
}
/**
* zset,
*/

View File

@ -0,0 +1,13 @@
package com.xiaofan.loadcenter.gateway.mq;
/**
* @ProjectName: LoadCenter
* @PackageName: com.xiaofan.loadcenter.gateway.mq
* @Description TODO
* @Author XiaoFan
* @Date 2024/4/20 15:23
* @Version 1.0
*/
public class Consumer {
}

View File

@ -16,6 +16,10 @@ public interface GatewayLoadService {
*/
String loadNode();
void dilatation();
void selectecs();
void companding() throws Exception;
void refreshLoad();
}

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.xiaofan.loadcenter.common.aliyun.AliYunEcsService;
import com.xiaofan.loadcenter.common.domain.EcsInstanceInfo;
import com.xiaofan.loadcenter.common.domain.WorkGatewayNode;
import com.xiaofan.loadcenter.common.rabbit.RabbitMqConfig;
import com.xiaofan.loadcenter.gateway.cache.GatewayLoadNodeCache;
import com.xiaofan.loadcenter.gateway.cache.GatewayLoadSeriesCache;
import com.xiaofan.loadcenter.gateway.cache.GatewayNodeCache;
@ -16,12 +17,17 @@ import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/**
@ -43,6 +49,8 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
private final GatewayLoadSeriesCache gatewayLoadSeriesCache;
private final GatewayNodeCache gatewayNodeCache;
private final GatewayNodeScoreCache gatewayNodeScoreCache;
@Autowired
private RabbitTemplate rabbitTemplate;
//aa:计算服务器到达扩容阈值的节点个数
@ -57,19 +65,133 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
*/
@Override
public String loadNode() {
//从缓存中获取节点与权重的映射关系
Map<Object, Double> map = gatewayNodeScoreCache.get();
// 获得自增序列值
Long seriesLoad = gatewayLoadSeriesCache.incrementAndGet();
// 计算当前负载索引
Long seriesLoadIndex = seriesLoad % nodeLength;
// 根据计算得到的负载索引从缓存中获取对应的节点ID
String loadNodeId = gatewayLoadNodeCache.getBydIndex(seriesLoadIndex);
// 使用获取到的节点ID从缓存中检索具体的网关节点信息
GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId);
// 返回网关节点的公有ID地址
return gatewayNodeInfo.getPublicIdAddress();
}
@Override
public void selectecs() {
//客户端初始化
OkHttpClient client = new OkHttpClient();
ArrayList<String> ipCacheSet = new ArrayList<>();
//从Redis获取服务器IP集合
Map<Object, Double> map = gatewayNodeScoreCache.get1();
for (Map.Entry<Object, Double> entry : map.entrySet()) {
ipCacheSet.add(entry.getKey().toString());
}
log.info("共有个{}服务器",ipCacheSet.size());
//遍历每台服务器进行负载均衡
for (String ip : ipCacheSet) {
//构建请求URL和请求头
String URL="http://"+ip+":8080/public/cluster";
Request request=new Request.Builder()
.url(URL)
.get()
.addHeader("User-Agent","Apifox/1.0.0(https://apifox.com)")
.addHeader("Accesstoken","")
.build();
try {
Response response=client.newCall(request).execute();
//解析响应数据
JSONArray jsonArray = JSONArray.parseArray(response.body().string());
JSONObject jsonObject = jsonArray.getJSONObject(0);
JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo");
int connectSize=mqttInfo.getIntValue("connectSize");
log.info("服务器:{},车辆连接数:{}",ip,connectSize);
//更新Redis中服务器的连接数ZSet数据类型
gatewayNodeScoreCache.put(ip,connectSize);
}catch (Exception e){
e.printStackTrace();
}
}
}
@Override
public void companding() throws Exception{
//查找连接数
Long nodeNum = gatewayNodeScoreCache.getGatewayNodeNum();
//查找最大的连接数
Long onlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum();
//计算负载率
BigDecimal loadRate = new BigDecimal(nodeNum).divide(new BigDecimal(onlineNum), 2, RoundingMode.HALF_UP).multiply(BigDecimal.valueOf(100));
log.info("负载率:"+loadRate);
//获取所有set数据
ArrayList<WorkGatewayNode> nodeList = gatewayNodeScoreCache.get();
if (loadRate.longValue() >= 80L){
log.info("负载过高,开始扩容");
String instanceId = aliYunEcsService.runInstances();
log.info("扩容的节点ip为:"+instanceId);
//休眠5秒以确保新实例创建完成
Thread.sleep(5000);
//获取新实例信息并将其放入Redis
EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId);
GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo();
gatewayNodeInfo.setNodeId(ecsInstanceInfo.getInstanceId());
gatewayNodeInfo.setPublicIdAddress(ecsInstanceInfo.getPublicIpAddress());
gatewayNodeInfo.setPrivateIdAddress(ecsInstanceInfo.getPrivateIpAddress());
//添加缓存数据
gatewayNodeCache.put(ecsInstanceInfo.getPublicIpAddress(),gatewayNodeInfo);
//修改服务器与在线车辆
gatewayNodeScoreCache.put(ecsInstanceInfo.getPublicIpAddress(),0);
log.info("实例id和公网ip存入Redis");
}else if (loadRate.longValue()< 15L){
if (nodeList.size()>2){
log.info("负载率:"+loadRate);
log.info("负载过低,开始缩容"+loadRate.longValue());
WorkGatewayNode minConnectionNode=null;
int minConnections=Integer.MAX_VALUE;
for (WorkGatewayNode node : nodeList) {
int nodeConnections=node.getWeight();
if (nodeConnections<minConnections){
minConnections=nodeConnections;
minConnectionNode=node;
}
}
//minConnectionNode现在存储了连接数最少的节点
log.info("连接数最少的节点为:"+minConnectionNode.getNodeId());
//先删除zSet不让车连接
gatewayNodeScoreCache.remove(minConnectionNode.getNodeId());
GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(minConnectionNode.getNodeId());
log.info("删除节点:"+gatewayNodeInfo.getNodeId());
//rabiitmq将对象传走
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE,RabbitMqConfig.KEY,gatewayNodeInfo,message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
},new CorrelationData(UUID.randomUUID().toString()));
}else{
log.info("负载过低,但是节点数小于2,不缩容");
}
}
}
@Override
public void refreshLoad() {
//获得zSet数据结构数据
ArrayList<WorkGatewayNode> nodeIdList = gatewayNodeScoreCache.get();
//初始化负载节点列表
List<String> loadNodeList = new ArrayList<>();
//使用Stream API重构循环,将节点与权重映射关系转换为节点列表
List<WorkGatewayNode> nodeIdList=map.entrySet()
.stream()
.peek(entry->{
String key=entry.getKey().toString();
log.info(String.format("%s -- %s",key,key));
})
.map(entry-> createNode(entry.getKey(),entry.getValue()))
.collect(Collectors.toList());
//计算节点权重总和
long count=nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum();
@ -117,102 +239,10 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
}
}
//重置
gatewayLoadSeriesCache.refresh();
//存负载节点
gatewayLoadNodeCache.put(loadNodeList);
// 获得自增序列值
Long seriesLoad = gatewayLoadSeriesCache.incrementAndGet();
// 计算当前负载索引
Long seriesLoadIndex = seriesLoad % nodeLength;
// 根据计算得到的负载索引从缓存中获取对应的节点ID
String loadNodeId = gatewayLoadNodeCache.getBydIndex(seriesLoadIndex);
// 使用获取到的节点ID从缓存中检索具体的网关节点信息
GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId);
// 返回网关节点的公有ID地址
return gatewayNodeInfo.getPublicIdAddress();
}
@Override
public void dilatation() {
//客户端初始化
OkHttpClient client = new OkHttpClient();
ArrayList<String> ipCacheSet = new ArrayList<>();
//从Redis获取服务器IP集合
Map<Object, Double> map = gatewayNodeScoreCache.get();
for (Map.Entry<Object, Double> entry : map.entrySet()) {
ipCacheSet.add(entry.getKey().toString());
}
log.info("共有个{}服务器",ipCacheSet.size());
//遍历每台服务器进行负载均衡
for (String ip : ipCacheSet) {
//构建请求URL和请求头
String URL="http://"+ip+":8080/public/cluster";
Request request=new Request.Builder()
.url(URL)
.get()
.addHeader("User-Agent","Apifox/1.0.0(https://apifox.com)")
.addHeader("Accesstoken","")
.build();
try {
Response response=client.newCall(request).execute();
//解析响应数据
JSONArray jsonArray = JSONArray.parseArray(response.body().string());
JSONObject jsonObject = jsonArray.getJSONObject(0);
JSONObject mqttInfo = jsonObject.getJSONObject("mqttInfo");
int connectSize=mqttInfo.getIntValue("connectSize");
log.info("服务器:{},车辆连接数:{}",ip,connectSize);
//更新Redis中服务器的连接数ZSet数据类型
gatewayNodeScoreCache.put(ip,connectSize);
//根据连接数判断是否需要进行扩容或缩容
if (connectSize>=5){
aa++;
//当满足扩容条件时,记录日志并执行扩容操作
if (aa==ipCacheSet.size()){
log.info(aa+"-----"+ipCacheSet.size());
log.info("需要扩容");
//调用阿里云ECS服务创建新实例
String instanceId = aliYunEcsService.runInstances();
log.info("扩容的节点ip为{}",instanceId);
//休眠5秒以确保新实例创建完成
Thread.sleep(5000);
//获取新实例信息并将其放入Redis
EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId);
GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo();
gatewayNodeInfo.setNodeId(ecsInstanceInfo.getInstanceId());
gatewayNodeInfo.setPublicIdAddress(ecsInstanceInfo.getPublicIpAddress());
gatewayNodeInfo.setPrivateIdAddress(ecsInstanceInfo.getPrivateIpAddress());
gatewayNodeCache.put(ecsInstanceInfo.getPublicIpAddress(),gatewayNodeInfo);
//)修改服务器与在线车辆数据
gatewayNodeScoreCache.put(ecsInstanceInfo.getPublicIpAddress(),0);
log.info("实例id和公网ip存入Redis");
aa=0;
}
}else{
aa=0;
}
}catch (Exception e){
e.printStackTrace();
}
}
}
//定义一个静态工厂方法用于创建WorkGatewayNode对象并设置节点ID和权重
public static WorkGatewayNode createNode(Object nodeId,double weight){
//创建新的WorkGatewayNode实例
WorkGatewayNode node=new WorkGatewayNode();
//将节点ID设置为字符串并设置到节点对象上
node.setNodeId(nodeId.toString());
//将权重值向下取整为整数并设置到节点对象上
node.setWeight((int) weight);
//返回已设置好属性的节点对象
return node;
}
}