12312
parent
3373a41e44
commit
7ee883c388
|
@ -8,7 +8,10 @@ import com.aliyun.teautil.models.RuntimeOptions;
|
|||
import com.yao.common.aliy.model.EcsSelectModel;
|
||||
import com.yao.common.config.AlyConfigProperties;
|
||||
import com.yao.common.domain.aliy.InstanceInfo;
|
||||
import com.yao.gateway.cache.GatewayNodeSetVinCache;
|
||||
import com.yao.gateway.cache.GateWayNodeInfo;
|
||||
import com.yao.gateway.cache.GatewayLoadSeriesCache;
|
||||
import com.yao.gateway.cache.GatewayNodeCache;
|
||||
import com.yao.gateway.cache.GatewayNodeScoreCache;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -26,12 +29,19 @@ public class AliYunEcsService {
|
|||
|
||||
private final AlyConfigProperties alyConfigProperties;
|
||||
private final Client client;
|
||||
//网关节点分数
|
||||
public final GatewayNodeScoreCache gatewayNodeScoreCache;
|
||||
//网关节点缓存
|
||||
private final GatewayNodeCache gatewayNodeCache;
|
||||
//网关负载序列
|
||||
private final GatewayLoadSeriesCache gatewayLoadSeriesCache;
|
||||
|
||||
private final GatewayNodeSetVinCache gatewayNodeSetVinCache;
|
||||
public AliYunEcsService(AlyConfigProperties alyConfigProperties, Client client, GatewayNodeSetVinCache gatewayNodeSetVinCache) {
|
||||
public AliYunEcsService(AlyConfigProperties alyConfigProperties, Client client, GatewayNodeScoreCache gatewayNodeScoreCache, GatewayNodeCache gatewayNodeCache, GatewayLoadSeriesCache gatewayLoadSeriesCache) {
|
||||
this.alyConfigProperties = alyConfigProperties;
|
||||
this.client = client;
|
||||
this.gatewayNodeSetVinCache = gatewayNodeSetVinCache;
|
||||
this.gatewayNodeScoreCache = gatewayNodeScoreCache;
|
||||
this.gatewayNodeCache = gatewayNodeCache;
|
||||
this.gatewayLoadSeriesCache = gatewayLoadSeriesCache;
|
||||
}
|
||||
|
||||
//todo----------------------------------------------------以下是查询代码--------------------------------------------
|
||||
|
@ -42,7 +52,7 @@ public class AliYunEcsService {
|
|||
* @throws Exception
|
||||
* @Description: 根据id和name查询内容
|
||||
*/
|
||||
public List<InstanceInfo> selectList(EcsSelectModel ecsSelectModel) throws Exception {
|
||||
public List<InstanceInfo> selectList(EcsSelectModel ecsSelectModel) {
|
||||
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest()
|
||||
.setRegionId(alyConfigProperties.getRegionId());
|
||||
if (ecsSelectModel.getInstanceNameList() == null || ecsSelectModel.getInstanceNameList().isEmpty()) {
|
||||
|
@ -105,28 +115,39 @@ public class AliYunEcsService {
|
|||
/**
|
||||
* initialization 初始化公共请求参数
|
||||
*/
|
||||
public List<String> startCreate() throws Exception {
|
||||
public List<String> startCreate() {
|
||||
// 公网出带宽最大值,单位为 Mbit/s。取值范围:0~100。 默认值:0。
|
||||
Integer internetMaxBandwidthOut = com.aliyun.darabonbanumber.Client.parseInt("5");
|
||||
Integer internetMaxBandwidthOut = null;
|
||||
try {
|
||||
internetMaxBandwidthOut = com.aliyun.darabonbanumber.Client.parseInt("5");
|
||||
List<String> s = RunInstances(
|
||||
client, alyConfigProperties.getRegionId(), alyConfigProperties.getImageId(), alyConfigProperties.getInstanceType(),
|
||||
alyConfigProperties.getSecurityGroupId(), alyConfigProperties.getVSwitchId(), internetMaxBandwidthOut,
|
||||
alyConfigProperties.getInternetChargeType(), alyConfigProperties.getSize(), alyConfigProperties.getCategory(),
|
||||
alyConfigProperties.getInstanceChargeType());
|
||||
EcsSelectModel ecsSelectModel = new EcsSelectModel();
|
||||
ecsSelectModel.setInstanceIdList(s);
|
||||
List<InstanceInfo> list = selectList(ecsSelectModel);
|
||||
double num = 0;
|
||||
//新增节点时数据往string类型新增一条数据,然后把序列数据重置为0,接着重新排序
|
||||
list.forEach(
|
||||
item -> {
|
||||
String publicIP = item.getPublicIpAddress().substring(1, item.getPublicIpAddress().length() - 1);
|
||||
String privateIP = item.getPrivateIpAddress().substring(1, item.getPrivateIpAddress().length() - 1);
|
||||
item.setPublicIpAddress(publicIP);
|
||||
item.setPrivateIpAddress(privateIP);
|
||||
//存入数据
|
||||
gatewayNodeScoreCache.newCount(item.getInstanceId(),num);
|
||||
gatewayNodeCache.add(new GateWayNodeInfo(item.getInstanceId(),publicIP,privateIP));
|
||||
gatewayLoadSeriesCache.init();
|
||||
}
|
||||
);
|
||||
return s;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
// 批量创建实例
|
||||
List<String> s = RunInstances(
|
||||
client, alyConfigProperties.getRegionId(), alyConfigProperties.getImageId(), alyConfigProperties.getInstanceType(),
|
||||
alyConfigProperties.getSecurityGroupId(), alyConfigProperties.getVSwitchId(), internetMaxBandwidthOut,
|
||||
alyConfigProperties.getInternetChargeType(), alyConfigProperties.getSize(), alyConfigProperties.getCategory(),
|
||||
alyConfigProperties.getInstanceChargeType());
|
||||
EcsSelectModel ecsSelectModel = new EcsSelectModel();
|
||||
ecsSelectModel.setInstanceIdList(s);
|
||||
List<InstanceInfo> list = selectList(ecsSelectModel);
|
||||
list.forEach(
|
||||
item -> {
|
||||
String publicIP = item.getPublicIpAddress().substring(1, item.getPublicIpAddress().length() - 1);
|
||||
item.setPublicIpAddress(publicIP);
|
||||
//存入数据
|
||||
gatewayNodeSetVinCache.newInstance(item);
|
||||
log.info("公网IP:" + item.getPublicIpAddress());
|
||||
}
|
||||
);
|
||||
return s;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -58,6 +58,19 @@ public class Constants {
|
|||
// 次数
|
||||
public static final Integer SUM = 100;
|
||||
|
||||
/**
|
||||
* 扩容
|
||||
*/
|
||||
public static final Long EXPAND_CAPACITY = 80L;
|
||||
|
||||
/**
|
||||
* 缩容
|
||||
*/
|
||||
public static final Long REDUCE_CAPACITY = 20L;
|
||||
/**
|
||||
* 最大连接数
|
||||
*/
|
||||
public static final Long nodeMaxNum = 100L;
|
||||
/**
|
||||
* 登录成功状态
|
||||
*/
|
||||
|
|
|
@ -14,7 +14,7 @@ import lombok.NoArgsConstructor;
|
|||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class GateWayNodeInfo {
|
||||
public class GateWayNodeScore {
|
||||
|
||||
/**
|
||||
* 节点id
|
|
@ -17,6 +17,6 @@ import lombok.NoArgsConstructor;
|
|||
public class WorkGateWayNode {
|
||||
|
||||
private String nodeId;
|
||||
private Integer weight;
|
||||
private Long weight;
|
||||
|
||||
}
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
package com.yao.common.domain.aliy;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
* @Date: 2024/4/16
|
||||
* @Description:
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class InstanceRequest implements Serializable {
|
||||
|
||||
private String publicIpAddress;
|
||||
private String instanceId;
|
||||
}
|
|
@ -2,6 +2,8 @@ package com.yao.common.mqtt;
|
|||
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.yao.common.domain.aliy.InstanceInfo;
|
||||
import com.yao.gateway.cache.GatewayNodeScoreCache;
|
||||
import com.yao.gateway.cache.GatewayVehicleLineNodeCache;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import okhttp3.OkHttpClient;
|
||||
|
@ -10,6 +12,7 @@ import okhttp3.Response;
|
|||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
|
@ -20,10 +23,13 @@ import java.io.IOException;
|
|||
@Log4j2
|
||||
public class MqttConnectService {
|
||||
|
||||
private GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache;
|
||||
|
||||
public MqttConnectService(GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache) {
|
||||
//网关连接车俩
|
||||
private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache;
|
||||
// 网关节点分数
|
||||
private final GatewayNodeScoreCache gatewayNodeScoreCache;
|
||||
private MqttConnectService(GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, GatewayNodeScoreCache gatewayNodeScoreCache) {
|
||||
this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache;
|
||||
this.gatewayNodeScoreCache = gatewayNodeScoreCache;
|
||||
}
|
||||
|
||||
//todo-----------------------连接mqtt方法-------------------
|
||||
|
@ -51,4 +57,17 @@ public class MqttConnectService {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 把传入的数据放入list集合后然后进行连接mqtt
|
||||
* @param list 实例集合数据
|
||||
*/
|
||||
public void cycle(List<InstanceInfo> list) {
|
||||
//单个的ip给他mqtt连接
|
||||
for (InstanceInfo instanceInfo : list) {
|
||||
Integer connectSize = connectMqtt(instanceInfo.getPublicIpAddress());
|
||||
gatewayNodeScoreCache.newCount(instanceInfo.getInstanceId(),connectSize);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -265,6 +265,16 @@ public class RedisService {
|
|||
return redisTemplate.opsForZSet().range(key,0,-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改score的值
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param score 分数
|
||||
*/
|
||||
public <T> void getZSetRem(final String key, T value, final double score){
|
||||
redisTemplate.opsForZSet().add(key,value,score);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取集合中是分数数据
|
||||
* @param zSetKey
|
||||
|
|
|
@ -21,7 +21,7 @@ public class GatewayLoadSeriesCache extends GatewayNodeAbstract {
|
|||
*/
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
redisService.setCacheObject(gatewayLoadSeriesKey,0L);
|
||||
redisService.setCacheObject(gatewayLoadSeriesKey,0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package com.yao.gateway.cache;
|
||||
|
||||
import com.yao.common.domain.GateWayNodeInfo;
|
||||
|
||||
import com.aliyun.teautil.Common;
|
||||
import com.yao.common.domain.GateWayNodeScore;
|
||||
import com.yao.common.domain.WorkGateWayNode;
|
||||
import com.yao.common.domain.aliy.InstanceInfo;
|
||||
import com.yao.gateway.cache.abs.GatewayNodeAbstract;
|
||||
|
@ -27,26 +29,16 @@ public class GatewayNodeScoreCache extends GatewayNodeAbstract {
|
|||
* @param info 新增数据的信息
|
||||
* @param count 网关连接的个数
|
||||
*/
|
||||
public void newCount(InstanceInfo info, Integer count) {
|
||||
redisService.setCacheZSet(zSetKey, info, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取出数据
|
||||
*
|
||||
* @return 取出的内容
|
||||
*/
|
||||
public Set<InstanceInfo> get() {
|
||||
Set<InstanceInfo> cacheZSet = redisService.getCacheZSet(zSetKey);
|
||||
|
||||
return cacheZSet;
|
||||
public void newCount(String nodeId, double count) {
|
||||
redisService.setCacheZSet(zSetKey, nodeId, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取他的内容
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public List<GateWayNodeInfo> getCacheZSet() {
|
||||
public List<GateWayNodeScore> getCacheZSet() {
|
||||
Set<ZSetOperations.TypedTuple<String>> set = redisService.getZSet(zSetKey);
|
||||
// Map<String, Double> nodeMap = set.stream()
|
||||
// .collect(Collectors.toMap(
|
||||
|
@ -54,16 +46,27 @@ public class GatewayNodeScoreCache extends GatewayNodeAbstract {
|
|||
// ZSetOperations.TypedTuple::getScore
|
||||
// ));
|
||||
return set.stream()
|
||||
.map(zset->
|
||||
GateWayNodeInfo.builder()
|
||||
.nodeId(zset.getValue())
|
||||
.score(zset.getScore())
|
||||
.build()
|
||||
.map(zset ->
|
||||
GateWayNodeScore.builder()
|
||||
.nodeId(zset.getValue())
|
||||
.score(zset.getScore())
|
||||
.build()
|
||||
).toList();
|
||||
//算法:有没有超过百分比
|
||||
// double sum = nodeMap.values().stream().mapToDouble(value -> value).sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* 这个节点对应连接的节点数
|
||||
* @return 节点数
|
||||
*/
|
||||
public Long getGatewayNodeNum() {
|
||||
List<GateWayNodeScore> instanceInfos = getCacheZSet();
|
||||
//有没有超过百分比
|
||||
double sum = instanceInfos.stream().mapToDouble(GateWayNodeScore::getScore).sum();
|
||||
return (long) sum;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 删除数据
|
||||
*/
|
||||
|
@ -71,4 +74,13 @@ public class GatewayNodeScoreCache extends GatewayNodeAbstract {
|
|||
redisService.deleteObject(zSetKey);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除这个key其中的一个数据
|
||||
* @param gateWayNodeScore 对应的值
|
||||
*/
|
||||
public void remote(GateWayNodeScore gateWayNodeScore){
|
||||
redisService.deleteCacheMapValue(zSetKey, Common.toJSONString(gateWayNodeScore));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ public class GatewayNodeSetVinCache extends GatewayNodeAbstract {
|
|||
/**
|
||||
* 阿里云键
|
||||
*/
|
||||
private static final String realKey ="new:real:column";
|
||||
private static final String realKey ="new:real:column:";
|
||||
|
||||
/**
|
||||
* 添加阿里云实列
|
||||
|
@ -29,11 +29,11 @@ public class GatewayNodeSetVinCache extends GatewayNodeAbstract {
|
|||
|
||||
/**
|
||||
* 取出实例数据
|
||||
* @param realKey 键名称
|
||||
* @return 相关实例数据值
|
||||
* @param ip 键名称
|
||||
* @return 车辆的VIN
|
||||
*/
|
||||
public Set<InstanceInfo> get(){
|
||||
return redisService.getCacheSet(realKey);
|
||||
public Set<String> get(String ip){
|
||||
return redisService.getCacheSet(realKey+ip);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -39,6 +39,4 @@ public class GatewayVehicleLineNodeCache extends GatewayNodeAbstract {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import com.yao.common.config.Result;
|
|||
import com.yao.server.service.GatewayLoadService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
|
@ -14,6 +16,7 @@ import org.springframework.web.bind.annotation.RestController;
|
|||
*/
|
||||
@RestController("/gateway")
|
||||
@Log4j2
|
||||
@EnableScheduling
|
||||
public class GatewayController {
|
||||
|
||||
@Autowired
|
||||
|
@ -24,6 +27,13 @@ public class GatewayController {
|
|||
return Result.success(gatewayLoadService.loadNode());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 每隔10秒就查询一次
|
||||
* @param ip
|
||||
*/
|
||||
@Scheduled(cron = "0/10 * * * * *")
|
||||
public void requestLoad(String ip){
|
||||
gatewayLoadService.requestLoad(ip);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -12,5 +12,10 @@ public interface GatewayLoadService {
|
|||
*/
|
||||
String loadNode();
|
||||
|
||||
/**
|
||||
* 请求负载
|
||||
*/
|
||||
void requestLoad(String ip);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.yao.server.service.impl;
|
||||
|
||||
import com.yao.common.mqtt.MqttConnectService;
|
||||
import com.yao.gateway.cache.GateWayNodeInfo;
|
||||
import com.yao.gateway.cache.GatewayLoadNodeCache;
|
||||
import com.yao.gateway.cache.GatewayLoadSeriesCache;
|
||||
|
@ -25,7 +26,8 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
|
|||
private final GatewayLoadSeriesCache gatewayLoadSeriesCache;
|
||||
//网关节点缓存
|
||||
private final GatewayNodeCache gatewayNodeCache;
|
||||
|
||||
// 连接mqttx的配置类
|
||||
private final MqttConnectService mqttConnectService;
|
||||
|
||||
@Override
|
||||
public String loadNode() {
|
||||
|
@ -39,4 +41,10 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
|
|||
//获取外网ip
|
||||
return gateWayNodeInfo.getPublicIdAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestLoad(String ip) {
|
||||
mqttConnectService.connectMqtt(ip);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package com.yao.server.service.impl;
|
||||
|
||||
import com.yao.common.config.Constants;
|
||||
import com.yao.common.domain.GateWayNodeScore;
|
||||
import com.yao.common.domain.WorkGateWayNode;
|
||||
import com.yao.common.mqtt.MqttConnectService;
|
||||
import com.yao.gateway.cache.GatewayArithmeticCache;
|
||||
import com.yao.gateway.cache.GatewayVehicleLineNodeCache;
|
||||
import com.yao.gateway.cache.GatewayNodeScoreCache;
|
||||
import com.yao.server.service.LoadService;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
@ -23,47 +23,43 @@ import java.util.concurrent.CountDownLatch;
|
|||
public class LoadServiceImpl implements LoadService {
|
||||
//网关算法缓存
|
||||
private final GatewayArithmeticCache gatewayArithmeticCache;
|
||||
// 网关连接车俩
|
||||
private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache;
|
||||
private MqttConnectService mqttConnectService;
|
||||
private final GatewayNodeScoreCache gatewayNodeScoreCache;
|
||||
|
||||
public LoadServiceImpl(GatewayArithmeticCache gatewayArithmeticCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, MqttConnectService mqttConnectService) {
|
||||
public LoadServiceImpl(GatewayArithmeticCache gatewayArithmeticCache, GatewayNodeScoreCache gatewayNodeScoreCache) {
|
||||
this.gatewayArithmeticCache = gatewayArithmeticCache;
|
||||
this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache;
|
||||
this.mqttConnectService = mqttConnectService;
|
||||
this.gatewayNodeScoreCache = gatewayNodeScoreCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String load() {
|
||||
//初始化序列
|
||||
gatewayArithmeticCache.count();
|
||||
ArrayList<WorkGateWayNode> nodeIdList = carWorkGatewayNode();
|
||||
List<WorkGateWayNode> nodeIdList = carWorkGatewayNode();
|
||||
//100
|
||||
List<String> loadNodeList = new ArrayList<>();
|
||||
Integer count = nodeIdList.stream().mapToInt(WorkGateWayNode::getWeight).sum();
|
||||
Long count = nodeIdList.stream().mapToLong(WorkGateWayNode::getWeight).sum();
|
||||
if (count < Constants.SUM) {
|
||||
List<WorkGateWayNode> list = nodeIdList.stream()
|
||||
.sorted(((o1, o2) -> o2.getWeight() - o1.getWeight()))
|
||||
.sorted(((o1, o2) -> (int) (o2.getWeight() - o1.getWeight())))
|
||||
.toList();
|
||||
Integer countWeight = 0;
|
||||
for (Integer i = count; i < Constants.SUM; i++) {
|
||||
WorkGateWayNode workGateWayNode = list.get(countWeight++ % list.size());
|
||||
Long countWeight = 0L;
|
||||
for (Long i = count; i < Constants.SUM; i++) {
|
||||
WorkGateWayNode workGateWayNode = list.get((int) (countWeight++ % list.size()));
|
||||
workGateWayNode.setWeight(workGateWayNode.getWeight() + 1);
|
||||
}
|
||||
}
|
||||
|
||||
work:
|
||||
while (true) {
|
||||
for (WorkGateWayNode workGateWayNode : nodeIdList) {
|
||||
Integer nodeWeight = workGateWayNode.getWeight();
|
||||
Long nodeWeight = workGateWayNode.getWeight();
|
||||
if (nodeWeight > 0) {
|
||||
loadNodeList.add(
|
||||
workGateWayNode.getNodeId()
|
||||
);
|
||||
workGateWayNode.setWeight(nodeWeight - 1);
|
||||
}
|
||||
int intStream = nodeIdList.stream()
|
||||
.mapToInt(WorkGateWayNode::getWeight)
|
||||
Long intStream = nodeIdList.stream()
|
||||
.mapToLong(WorkGateWayNode::getWeight)
|
||||
.sum();
|
||||
if (intStream <= 0) {
|
||||
break work;
|
||||
|
@ -124,17 +120,26 @@ public class LoadServiceImpl implements LoadService {
|
|||
return key;
|
||||
}
|
||||
|
||||
public ArrayList<WorkGateWayNode> carWorkGatewayNode() {
|
||||
Set<String> ip = gatewayVehicleLineNodeCache.getAddress();
|
||||
ArrayList<WorkGateWayNode> list = new ArrayList<>();
|
||||
for (String s : ip) {
|
||||
Integer connectSize = mqttConnectService.connectMqtt(s);
|
||||
WorkGateWayNode workGateWayNode = new WorkGateWayNode();
|
||||
workGateWayNode.setWeight(connectSize);
|
||||
workGateWayNode.setNodeId(s);
|
||||
list.add(workGateWayNode);
|
||||
}
|
||||
return list;
|
||||
public List<WorkGateWayNode> carWorkGatewayNode() {
|
||||
|
||||
List<GateWayNodeScore> infos = gatewayNodeScoreCache.getCacheZSet();
|
||||
//上线最大数量
|
||||
Long vehicleMaxOnlineNum = infos.size() * Constants.nodeMaxNum;
|
||||
|
||||
//目前连接数
|
||||
Long vehicleOnlineNowNum = (long) infos.stream().mapToDouble(GateWayNodeScore::getScore).sum();
|
||||
//空余连接数
|
||||
Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleOnlineNowNum;
|
||||
List<WorkGateWayNode> workGateWayNodes = infos.stream()
|
||||
.map(workGateWay -> {
|
||||
return WorkGateWayNode
|
||||
.builder()
|
||||
.nodeId(workGateWay.getNodeId())
|
||||
.weight((long) (vehicleOnlineNum / (Constants.nodeMaxNum - workGateWay.getScore())))
|
||||
.build();
|
||||
}).toList();
|
||||
// Long sum = workGateWayNodes.stream().mapToLong(WorkGateWayNode::getWeight).sum();
|
||||
return workGateWayNodes;
|
||||
|
||||
}
|
||||
|
||||
|
@ -169,7 +174,6 @@ class SitNode {
|
|||
map.put(key, val);
|
||||
log.info(key + "-------" + val);
|
||||
});
|
||||
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,20 +1,21 @@
|
|||
package com.yao.server.timer;
|
||||
|
||||
import com.yao.common.aliy.AliYunEcsService;
|
||||
import com.yao.common.domain.GateWayNodeInfo;
|
||||
import com.yao.common.aliy.model.EcsSelectModel;
|
||||
import com.yao.common.config.Constants;
|
||||
import com.yao.common.domain.GateWayNodeScore;
|
||||
import com.yao.common.domain.aliy.InstanceInfo;
|
||||
import com.yao.common.mqtt.MqttConnectService;
|
||||
import com.yao.gateway.cache.GatewayNodeScoreCache;
|
||||
import com.yao.gateway.cache.GatewayNodeSetVinCache;
|
||||
import com.yao.gateway.cache.GatewayVehicleLineNodeCache;
|
||||
import com.yao.gateway.cache.*;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @Author: LiJiaYao
|
||||
|
@ -31,15 +32,19 @@ public class Timer {
|
|||
private final GatewayNodeSetVinCache gatewayNodeSetVinCache;
|
||||
private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache;
|
||||
private final GatewayNodeScoreCache gatewayNodeScoreCache;
|
||||
//网关节点缓存
|
||||
private final GatewayNodeCache gatewayNodeCache;
|
||||
|
||||
public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService, GatewayNodeSetVinCache gatewayAliYunCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, GatewayNodeScoreCache gatewayNodeScoreCache) {
|
||||
public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService, GatewayNodeSetVinCache gatewayAliYunCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, GatewayNodeScoreCache gatewayNodeScoreCache, GatewayNodeCache gatewayNodeCache) {
|
||||
this.aliYunEcsService = aliYunEcsService;
|
||||
this.mqttConnectService = mqttConnectService;
|
||||
this.gatewayNodeSetVinCache = gatewayAliYunCache;
|
||||
this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache;
|
||||
this.gatewayNodeScoreCache = gatewayNodeScoreCache;
|
||||
this.gatewayNodeCache = gatewayNodeCache;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Async
|
||||
@Scheduled(cron = "0/20 * * * * ?")
|
||||
public void timer() {
|
||||
|
@ -47,7 +52,8 @@ public class Timer {
|
|||
long startTime = System.currentTimeMillis();
|
||||
// 设置结束时间为10分钟后
|
||||
long endTime = startTime + 10 * 60 * 1000;
|
||||
Set<InstanceInfo> instance = gatewayNodeSetVinCache.get();
|
||||
List<GateWayNodeScore> instance = gatewayNodeScoreCache.getCacheZSet();
|
||||
//判空,如果为空就创建实例
|
||||
if (instance.isEmpty()) {
|
||||
try {
|
||||
aliYunEcsService.startCreate();
|
||||
|
@ -55,31 +61,40 @@ public class Timer {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
for (InstanceInfo s : instance) {
|
||||
Integer connectSize = mqttConnectService.connectMqtt(s.getPublicIpAddress());
|
||||
for (GateWayNodeScore gateWayNodeInfo : instance) {
|
||||
List<String> instanceId = null;
|
||||
String deleteInstanceId = null;
|
||||
if (connectSize >= 79) {
|
||||
Long gatewayNodeNum = gatewayNodeScoreCache.getGatewayNodeNum();
|
||||
//根据id查询出来他的ip地址
|
||||
List<InstanceInfo> list = aliYunEcsService.selectList(addNodeId(gateWayNodeInfo.getNodeId()));
|
||||
//查询ip地址后连接mqtt,查询连接的个数.改变他的值 Cycle 循环处理单个的ip值
|
||||
mqttConnectService.cycle(list);
|
||||
if (gatewayNodeNum >= Constants.EXPAND_CAPACITY) {
|
||||
//执行节点扩容
|
||||
//返回实例的ID
|
||||
if (!instanceId.isEmpty()) {
|
||||
try {
|
||||
instanceId = aliYunEcsService.startCreate();
|
||||
log.info("扩容成功!");
|
||||
log.info("扩容的节点id为:" + instanceId);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
instanceId = aliYunEcsService.startCreate();
|
||||
log.info("扩容成功!");
|
||||
log.info("扩容的节点id为:" + instanceId);
|
||||
Thread.sleep(3000);
|
||||
}
|
||||
}
|
||||
if (connectSize <= 20 && System.currentTimeMillis() < endTime) {
|
||||
aliYunEcsService.delete(s.getInstanceId());
|
||||
} else if (gatewayNodeNum <= Constants.REDUCE_CAPACITY && System.currentTimeMillis() < endTime) {
|
||||
aliYunEcsService.delete(gateWayNodeInfo.getNodeId());
|
||||
//删除实列以后再去把redis的值删除 再去通知重新上线
|
||||
gatewayNodeSetVinCache.remote(s);
|
||||
gatewayVehicleLineNodeCache.save(s.getPublicIpAddress());
|
||||
gatewayNodeScoreCache.remote(gateWayNodeInfo);
|
||||
// gatewayVehicleLineNodeCache.save(s.getPublicIpAddress());
|
||||
log.info("缩容成功!");
|
||||
log.info("锁容的节点id为:" + deleteInstanceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//让他查询出来他的值
|
||||
public EcsSelectModel addNodeId(String nodeId) {
|
||||
ArrayList<String> nodeIdList = new ArrayList();
|
||||
EcsSelectModel ecsSelectModel = new EcsSelectModel();
|
||||
nodeIdList.add(nodeId);
|
||||
ecsSelectModel.setInstanceIdList(nodeIdList);
|
||||
return ecsSelectModel;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue