feat commit

网关负载功能
master
玉安君 2024-04-20 09:36:24 +08:00
parent a3059cbbf7
commit 682dd2f6e0
7 changed files with 98 additions and 25 deletions

View File

@ -406,6 +406,12 @@ public class RedisService {
setOperations.remove(key, value); setOperations.remove(key, value);
} }
public <T> void deleteCacheZsetValue(final String key, final T value) {
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
zSetOperations.remove(key, value);
}
/** /**
* @description: Listvalue * @description: Listvalue
* @author: LiYuan * @author: LiYuan

View File

@ -12,6 +12,7 @@ import org.springframework.stereotype.Component;
* @Author: LiYuan * @Author: LiYuan
* @CreateTime: 2024-04-17 21:53 * @CreateTime: 2024-04-17 21:53
* @Description: * @Description:
* @Option:
* @Version: 1.0 * @Version: 1.0
*/ */
@Component @Component

View File

@ -1,8 +1,13 @@
package com.zhilian.online.consumer; package com.zhilian.online.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.google.gson.JsonObject;
import com.zhilian.common.core.constant.Constants; import com.zhilian.common.core.constant.Constants;
import com.zhilian.common.redis.service.RedisService; import com.zhilian.common.redis.service.RedisService;
import com.zhilian.online.constans.OnlineConstants; import com.zhilian.online.constans.OnlineConstants;
import com.zhilian.online.domain.model.GatewayNodeInfo;
import com.zhilian.online.load.cache.GatewayNodeInfoCache;
import com.zhilian.online.load.cache.GatewayNodeWeightCache;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -28,6 +33,12 @@ public class DeadQueueConsumer {
@Autowired @Autowired
private RedisService redisService; private RedisService redisService;
@Autowired
private GatewayNodeInfoCache gatewayNodeInfoCache;
@Autowired
private GatewayNodeWeightCache gatewayNodeWeightCache;
/** /**
* ,线 * ,线
@ -36,10 +47,12 @@ public class DeadQueueConsumer {
// @RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME) // @RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)
public void SecureOnline(String gatherMsg) { public void SecureOnline(String gatherMsg) {
log.info("开始检查节点{}的上线状态......", gatherMsg); log.info("开始检查节点{}的上线状态......", gatherMsg);
GatewayNodeInfo gatewayNodeInfo = JSONObject.parseObject(gatherMsg, GatewayNodeInfo.class);
String ipAddress = ""; String ipAddress = "";
HttpURLConnection connection = null; HttpURLConnection connection = null;
try { try {
ipAddress = "http://" + ipAddress; ipAddress = OnlineConstants.HTTP_PREFIX + ipAddress + OnlineConstants.FLUXMQ_INFO_URL;
URL url = new URL(ipAddress); URL url = new URL(ipAddress);
connection = (HttpURLConnection)url.openConnection(); connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("GET"); connection.setRequestMethod("GET");
@ -50,10 +63,10 @@ public class DeadQueueConsumer {
} }
if (Constants.FAIL == responseCode){ if (Constants.FAIL == responseCode){
log.error("节点{}上线失败",gatherMsg); log.error("节点{}上线失败",gatherMsg);
//删除节点数据
gatewayNodeInfoCache.remove(gatewayNodeInfo.getNodeId());
//上线失败需要将该节点的负载均衡缓存删除 //上线失败需要将该节点的负载均衡缓存删除
if (redisService.hasKey(OnlineConstants.NODE_TOKEN_PREFIX + "")){ gatewayNodeWeightCache.remove(gatewayNodeInfo);
redisService.removeCacheZsetBatch(OnlineConstants.NODE_TOKEN_PREFIX + "");
}
} }
} catch (Exception e) { } catch (Exception e) {
log.error("节点上线失败{}",e.getMessage()); log.error("节点上线失败{}",e.getMessage());

View File

@ -53,6 +53,16 @@ public class ApifoxModel {
*/ */
private String websocketUrl; private String websocketUrl;
/**
*
*/
private String publicIpAddress;
/**
*
*/
private String privateIpAddress;
/** /**
* *
*/ */

View File

@ -240,12 +240,12 @@ public class EcsInstance {
/** /**
* IP * IP
*/ */
private List<String> publicIpAddress; private String publicIpAddress;
/** /**
* IP * IP
*/ */
private List<String> privateIpAddress; private String privateIpAddress;
/** /**
* ID * ID

View File

@ -45,18 +45,34 @@ public class GatewayNodeWeightCache extends GatewayCacheAbs<String > {
return redisService.getCacheZsetScore(encode(gatewayNodeLoadKey),gatewayNodeInfo); return redisService.getCacheZsetScore(encode(gatewayNodeLoadKey),gatewayNodeInfo);
} }
/**
* @description:
* @author: LiYuan
* @param:
* @return:
**/
public void put(GatewayNodeInfo gatewayNodeInfo){ public void put(GatewayNodeInfo gatewayNodeInfo){
redisService.setCacheZsetValue(encode(gatewayNodeLoadKey),gatewayNodeInfo,0.0); redisService.setCacheZsetValue(encode(gatewayNodeLoadKey),gatewayNodeInfo,0.0);
} }
/**
* @description:
* @author: LiYuan
* @param:
* @return:
**/
public void increment(GatewayNodeInfo gatewayNodeInfo,Double score){ public void increment(GatewayNodeInfo gatewayNodeInfo,Double score){
redisService.incrementScore(encode(gatewayNodeLoadKey),gatewayNodeInfo,score); redisService.incrementScore(encode(gatewayNodeLoadKey),gatewayNodeInfo,score);
} }
/**
* @description:
* @author: LiYuan
* @param:
* @return:
**/
public void remove(GatewayNodeInfo gatewayNodeInfo) {
redisService.deleteCacheZsetValue(encode(gatewayNodeLoadKey),gatewayNodeInfo);
}
} }

View File

@ -7,9 +7,13 @@ import com.zhilian.common.redis.service.RedisService;
import com.zhilian.online.config.RabbitConfig; import com.zhilian.online.config.RabbitConfig;
import com.zhilian.online.constans.OnlineConstants; import com.zhilian.online.constans.OnlineConstants;
import com.zhilian.online.domain.ApifoxModel; import com.zhilian.online.domain.ApifoxModel;
import com.zhilian.online.domain.EcsInstance;
import com.zhilian.online.domain.model.GatewayNodeInfo;
import com.zhilian.online.load.cache.GatewayNodeInfoCache; import com.zhilian.online.load.cache.GatewayNodeInfoCache;
import com.zhilian.online.load.cache.GatewayNodeWeightCache;
import com.zhilian.online.mapper.OnlineLoadCenterMapper; import com.zhilian.online.mapper.OnlineLoadCenterMapper;
import com.zhilian.online.service.OnlineLoadCenterService; import com.zhilian.online.service.OnlineLoadCenterService;
import com.zhilian.online.uitls.AliyunOpenAPIUtils;
import com.zhilian.online.uitls.MqttUtil; import com.zhilian.online.uitls.MqttUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -17,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ZSetOperations; import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -29,7 +34,7 @@ import java.util.stream.Collectors;
*/ */
@Service @Service
@Slf4j @Slf4j
public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService{ public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService {
/** /**
* *
@ -49,6 +54,9 @@ public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService{
@Autowired @Autowired
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
@Autowired
private AliyunOpenAPIUtils aliyunOpenAPIUtils;
/** /**
* MQTT * MQTT
*/ */
@ -58,6 +66,9 @@ public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService{
@Autowired @Autowired
private GatewayNodeInfoCache gatewayNodeInfoCache; private GatewayNodeInfoCache gatewayNodeInfoCache;
@Autowired
private GatewayNodeWeightCache gatewayNodeWeightCache;
/** /**
* @description: , 访 * @description: , 访
@ -72,14 +83,13 @@ public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService{
String token = IdUtils.fastSimpleUUID(); String token = IdUtils.fastSimpleUUID();
//将令牌信息缓存到Redis中 //将令牌信息缓存到Redis中
redisService.setCacheObject(OnlineConstants.NODE_TOKEN_PREFIX + clusterId,token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.SECONDS); redisService.setCacheObject(OnlineConstants.NODE_TOKEN_PREFIX + clusterId, token, OnlineConstants.ONLINE_TOKEN_EXPIRE, TimeUnit.SECONDS);
//将令牌信息返回客户端 //将令牌信息返回客户端
return Result.success(token); return Result.success(token);
} }
/** /**
* @description: 线 * @description: 线
* @author: LiYuan * @author: LiYuan
@ -93,25 +103,30 @@ public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService{
return Result.error("令牌已过期"); return Result.error("令牌已过期");
} }
String token = redisService.getCacheObject(OnlineConstants.NODE_TOKEN_PREFIX + apifoxModel.getClusterId()); String token = redisService.getCacheObject(OnlineConstants.NODE_TOKEN_PREFIX + apifoxModel.getClusterId());
if (!token.equals(apifoxModel.getToken())){ if (!token.equals(apifoxModel.getToken())) {
return Result.error("令牌错误"); return Result.error("令牌错误");
} }
//为该节点创建Zset负载均衡缓存,将该节点加入List在线节点缓存 GatewayNodeInfo gatewayNodeInfo = generateGatewayNodeInfo(apifoxModel);
redisService.setCacheZsetValue(OnlineConstants.GATHER_LOAD_PREFIX, apifoxModel.getClusterId(), 0.0);
//添加节点数据缓存
gatewayNodeInfoCache.put(gatewayNodeInfo);
//为该节点创建Zset负载均衡缓存,将该节点加入List在线节点缓存
gatewayNodeWeightCache.put(gatewayNodeInfo);
//向RabbitMQ||RocketMQ发送30s延迟消息,确保后续节点上线 //向RabbitMQ||RocketMQ发送30s延迟消息,确保后续节点上线
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, RabbitConfig.DELAY_ROUTING_KEY, JSON.toJSONString(apifoxModel)); rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, RabbitConfig.DELAY_ROUTING_KEY, JSON.toJSONString(apifoxModel));
return Result.success("节点上线"); return Result.success(gatewayNodeInfo,"节点上线");
} }
/** /**
* 线 * 线
*
* @return gather
* @author: LiYuan * @author: LiYuan
* @param: vin * @param: vin
* @return gather
*/ */
@Override @Override
public void applyForConnectToGather(String vin) { public void applyForConnectToGather(String vin) {
@ -120,24 +135,21 @@ public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService{
List<String> list = cacheList.stream().map(item -> { List<String> list = cacheList.stream().map(item -> {
return String.valueOf(item); return String.valueOf(item);
}).collect(Collectors.toList()); }).collect(Collectors.toList());
if (!list.contains(vin)){ if (!list.contains(vin)) {
throw new RuntimeException("车辆未登记"); throw new RuntimeException("车辆未登记");
} }
//获取负载最少的车辆进行链接 //获取负载最少的车辆进行链接
ZSetOperations.TypedTuple cacheZsetMin = redisService.getCacheZsetMin(OnlineConstants.GATHER_LOAD_PREFIX); ZSetOperations.TypedTuple cacheZsetMin = redisService.getCacheZsetMin(OnlineConstants.GATHER_LOAD_PREFIX);
//存放节点车辆信息 //存放节点车辆信息
redisService.setCacheObject(OnlineConstants.GATHER_LOAD_PREFIX+vin,1); redisService.setCacheObject(OnlineConstants.GATHER_LOAD_PREFIX + vin, 1);
//发送延迟队列确定车辆上线 //发送延迟队列确定车辆上线
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_FOR_CAR, RabbitConfig.DELAY_ROUTING_FOR_CAR,vin); rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_FOR_CAR, RabbitConfig.DELAY_ROUTING_FOR_CAR, vin);
return; return;
} }
/** /**
* @description:线 * @description:线
* @author: LiYuan * @author: LiYuan
@ -150,4 +162,19 @@ public class OnlineLoadCenterServiceImpl implements OnlineLoadCenterService{
} }
public GatewayNodeInfo generateGatewayNodeInfo(ApifoxModel apifoxModel) {
EcsInstance ecsInstance = aliyunOpenAPIUtils.queryEcsInstances(new ArrayList<String>() {{
add(apifoxModel.getPrivateIpAddress());
}}).get(0);
return GatewayNodeInfo.builder()
.nodeId(apifoxModel.getClusterId())
.privateIpAddress(ecsInstance.getPrivateIpAddress())
.publicIpAddress(ecsInstance.getPublicIpAddress())
.build();
}
} }