传入主分支,在分支上修改代码
parent
cd903475ce
commit
3373a41e44
|
@ -0,0 +1,28 @@
|
|||
package com.yao.common.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/15
|
||||
* @Description:
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class GateWayNodeInfo {
|
||||
|
||||
/**
|
||||
* 节点id
|
||||
*/
|
||||
private String nodeId;
|
||||
/**
|
||||
* 权重值
|
||||
*/
|
||||
private Double score;
|
||||
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package com.yao.common.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
|
@ -12,6 +13,7 @@ import lombok.NoArgsConstructor;
|
|||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class WorkGateWayNode {
|
||||
|
||||
private String nodeId;
|
||||
|
|
|
@ -249,10 +249,30 @@ public class RedisService {
|
|||
*
|
||||
* @param key Redis键
|
||||
* @param zValue Hash键
|
||||
* @param value 值
|
||||
* @param score 值
|
||||
*/
|
||||
public <T> void setCacheZSet(final String key, final T zValue, final double value) {
|
||||
redisTemplate.opsForZSet().add(key,zValue,value);
|
||||
public <T> void setCacheZSet(final String key, final T zValue, final double score) {
|
||||
redisTemplate.opsForZSet().add(key,zValue,score);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取值
|
||||
* @param key 键名
|
||||
* @return 对象
|
||||
* @param <T>
|
||||
*/
|
||||
public <T> Set<T> getCacheZSet(final String key) {
|
||||
return redisTemplate.opsForZSet().range(key,0,-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取集合中是分数数据
|
||||
* @param zSetKey
|
||||
* @return
|
||||
* @param <T>
|
||||
*/
|
||||
public <T> Set<ZSetOperations.TypedTuple<String>> getZSet(String zSetKey) {
|
||||
return redisTemplate.opsForZSet().rangeWithScores(zSetKey,0,-1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -326,4 +346,6 @@ public class RedisService {
|
|||
return redisTemplate.opsForValue().increment(cursor,l);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,20 +0,0 @@
|
|||
package com.yao.gateway.cache;
|
||||
|
||||
import com.yao.gateway.cache.abs.GatewayNodeAbstract;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/18
|
||||
* @Description: 网关业务数据存储
|
||||
*/
|
||||
@Component
|
||||
@Log4j2
|
||||
public class GatewayBusinessCache extends GatewayNodeAbstract {
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,8 +1,16 @@
|
|||
package com.yao.gateway.cache;
|
||||
|
||||
import com.yao.common.domain.GateWayNodeInfo;
|
||||
import com.yao.common.domain.WorkGateWayNode;
|
||||
import com.yao.common.domain.aliy.InstanceInfo;
|
||||
import com.yao.gateway.cache.abs.GatewayNodeAbstract;
|
||||
import org.springframework.data.redis.core.ZSetOperations;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/18
|
||||
|
@ -11,5 +19,56 @@ import org.springframework.stereotype.Component;
|
|||
@Component
|
||||
public class GatewayNodeScoreCache extends GatewayNodeAbstract {
|
||||
|
||||
private static final String zSetKey = "gateway:zSet:count";
|
||||
|
||||
/**
|
||||
* 网关连接计数
|
||||
*
|
||||
* @param info 新增数据的信息
|
||||
* @param count 网关连接的个数
|
||||
*/
|
||||
public void newCount(InstanceInfo info, Integer count) {
|
||||
redisService.setCacheZSet(zSetKey, info, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取出数据
|
||||
*
|
||||
* @return 取出的内容
|
||||
*/
|
||||
public Set<InstanceInfo> get() {
|
||||
Set<InstanceInfo> cacheZSet = redisService.getCacheZSet(zSetKey);
|
||||
|
||||
return cacheZSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取他的内容
|
||||
* @return
|
||||
*/
|
||||
public List<GateWayNodeInfo> getCacheZSet() {
|
||||
Set<ZSetOperations.TypedTuple<String>> set = redisService.getZSet(zSetKey);
|
||||
// Map<String, Double> nodeMap = set.stream()
|
||||
// .collect(Collectors.toMap(
|
||||
// ZSetOperations.TypedTuple::getValue,
|
||||
// ZSetOperations.TypedTuple::getScore
|
||||
// ));
|
||||
return set.stream()
|
||||
.map(zset->
|
||||
GateWayNodeInfo.builder()
|
||||
.nodeId(zset.getValue())
|
||||
.score(zset.getScore())
|
||||
.build()
|
||||
).toList();
|
||||
//算法:有没有超过百分比
|
||||
// double sum = nodeMap.values().stream().mapToDouble(value -> value).sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除数据
|
||||
*/
|
||||
public void remove() {
|
||||
redisService.deleteObject(zSetKey);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ public class GatewayNodeSetVinCache extends GatewayNodeAbstract {
|
|||
* @param realKey 键名称
|
||||
* @return 相关实例数据值
|
||||
*/
|
||||
public Set<InstanceInfo> get(String realKey){
|
||||
public Set<InstanceInfo> get(){
|
||||
return redisService.getCacheSet(realKey);
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ public class GatewayNodeSetVinCache extends GatewayNodeAbstract {
|
|||
* @param realKey new:real:column键
|
||||
* @param info 对应的值
|
||||
*/
|
||||
public void remote(String realKey,InstanceInfo info){
|
||||
public void remote(InstanceInfo info){
|
||||
redisService.deleteCacheMapValue(realKey, Common.toJSONString(info));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package com.yao.server.timer;
|
||||
|
||||
import com.yao.common.aliy.AliYunEcsService;
|
||||
import com.yao.common.domain.GateWayNodeInfo;
|
||||
import com.yao.common.domain.aliy.InstanceInfo;
|
||||
import com.yao.common.mqtt.MqttConnectService;
|
||||
import com.yao.gateway.cache.GatewayNodeScoreCache;
|
||||
import com.yao.gateway.cache.GatewayNodeSetVinCache;
|
||||
import com.yao.gateway.cache.GatewayVehicleLineNodeCache;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -23,18 +25,19 @@ import java.util.Set;
|
|||
@EnableAsync //开启异步支持
|
||||
@Component
|
||||
public class Timer {
|
||||
private static final String realKey = "new:real:column";
|
||||
private AliYunEcsService aliYunEcsService;
|
||||
private MqttConnectService mqttConnectService;
|
||||
private final AliYunEcsService aliYunEcsService;
|
||||
private final MqttConnectService mqttConnectService;
|
||||
// 阿里云缓存
|
||||
private GatewayNodeSetVinCache gatewayNodeSetVinCache;
|
||||
private GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache;
|
||||
private final GatewayNodeSetVinCache gatewayNodeSetVinCache;
|
||||
private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache;
|
||||
private final GatewayNodeScoreCache gatewayNodeScoreCache;
|
||||
|
||||
public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService, GatewayNodeSetVinCache gatewayAliYunCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache) {
|
||||
public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService, GatewayNodeSetVinCache gatewayAliYunCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, GatewayNodeScoreCache gatewayNodeScoreCache) {
|
||||
this.aliYunEcsService = aliYunEcsService;
|
||||
this.mqttConnectService = mqttConnectService;
|
||||
this.gatewayNodeSetVinCache = gatewayAliYunCache;
|
||||
this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache;
|
||||
this.gatewayNodeScoreCache = gatewayNodeScoreCache;
|
||||
}
|
||||
|
||||
@Async
|
||||
|
@ -44,7 +47,7 @@ public class Timer {
|
|||
long startTime = System.currentTimeMillis();
|
||||
// 设置结束时间为10分钟后
|
||||
long endTime = startTime + 10 * 60 * 1000;
|
||||
Set<InstanceInfo> instance = gatewayNodeSetVinCache.get(realKey);
|
||||
Set<InstanceInfo> instance = gatewayNodeSetVinCache.get();
|
||||
if (instance.isEmpty()) {
|
||||
try {
|
||||
aliYunEcsService.startCreate();
|
||||
|
@ -72,7 +75,7 @@ public class Timer {
|
|||
if (connectSize <= 20 && System.currentTimeMillis() < endTime) {
|
||||
aliYunEcsService.delete(s.getInstanceId());
|
||||
//删除实列以后再去把redis的值删除 再去通知重新上线
|
||||
gatewayNodeSetVinCache.remote(realKey, s);
|
||||
gatewayNodeSetVinCache.remote(s);
|
||||
gatewayVehicleLineNodeCache.save(s.getPublicIpAddress());
|
||||
log.info("缩容成功!");
|
||||
log.info("锁容的节点id为:" + deleteInstanceId);
|
||||
|
|
Loading…
Reference in New Issue