diff --git a/pom.xml b/pom.xml index 6edd979..deda98c 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,12 @@ + + + + org.springframework.boot + spring-boot-starter-amqp + org.springframework.boot @@ -102,6 +108,7 @@ 0.0.3 + diff --git a/src/main/java/com/loadcenter/common/aliyun/config/RabbitMqConfig.java b/src/main/java/com/loadcenter/common/aliyun/config/RabbitMqConfig.java new file mode 100644 index 0000000..a0d8f3d --- /dev/null +++ b/src/main/java/com/loadcenter/common/aliyun/config/RabbitMqConfig.java @@ -0,0 +1,86 @@ +package com.loadcenter.common.aliyun.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + + +@Configuration +public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { + public static final Logger logger= LoggerFactory.getLogger(RabbitMqConfig.class); + + private RabbitTemplate rabbitTemplate; + + + @Bean + public MessageConverter messageConverter(){ + return new Jackson2JsonMessageConverter(); + } + + public static final String QUEUE="queue"; + public static final String EXCHANGE="exchange"; + public static final String KEY="Key"; + + + @Primary + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ + + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + + this.rabbitTemplate=rabbitTemplate; + + rabbitTemplate.setMessageConverter(messageConverter()); + rabbitTemplate(); + return rabbitTemplate; + } + + public void rabbitTemplate(){ + rabbitTemplate.setConfirmCallback(this); + rabbitTemplate.setReturnsCallback(this); + } + + + @Bean + public Queue queue(){ + return new Queue(QUEUE,true); + } + + + @Bean + public DirectExchange directExchange(){ + return new DirectExchange(EXCHANGE); + } + + + + @Bean + public Binding binding(){ + return BindingBuilder.bind(queue()).to(directExchange()).with(KEY); + } + + + + @Override + public void confirm(CorrelationData correlationData, boolean ack, String s) { + + if (ack){ + logger.info("{}消息到达交换机",correlationData.getId()); + }else{ + logger.error("{}消息丢失",correlationData.getId()); + } + } + + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + logger.error("{}消息未到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId()); + } +} diff --git a/src/main/java/com/loadcenter/common/aliyun/config/RestClientConfig.java b/src/main/java/com/loadcenter/common/aliyun/config/RestClientConfig.java new file mode 100644 index 0000000..3e51c08 --- /dev/null +++ b/src/main/java/com/loadcenter/common/aliyun/config/RestClientConfig.java @@ -0,0 +1,18 @@ +package com.loadcenter.common.aliyun.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.client.RestTemplate; +/** + * @ClassName RestClientConfig + * @Description RestClientConfig + * @Author Can.J + * @Date 2024/4/21 16:18 + */ +@Configuration +public class RestClientConfig { + @Bean + public RestTemplate restTemplate(){ + return new RestTemplate(); + } +} diff --git a/src/main/java/com/loadcenter/common/domain/CPUInfo.java b/src/main/java/com/loadcenter/common/domain/CPUInfo.java new file mode 100644 index 0000000..b9b200e --- /dev/null +++ b/src/main/java/com/loadcenter/common/domain/CPUInfo.java @@ -0,0 +1,30 @@ +package com.loadcenter.common.domain; + +import lombok.Data; + +/** + * CPU使用信息 + */ +@Data +public class CPUInfo { + /** + * CPU核数 + */ + private long cpuNum; + /** + * 内核态使用率 + */ + private String cSys; + /** + * 空闲率 + */ + private String idle; + /** + * I/O等待 + */ + private String iowait; + /** + * 用户态使用率 + */ + private String user; +} diff --git a/src/main/java/com/loadcenter/common/domain/FlowInfo.java b/src/main/java/com/loadcenter/common/domain/FlowInfo.java new file mode 100644 index 0000000..1f339cf --- /dev/null +++ b/src/main/java/com/loadcenter/common/domain/FlowInfo.java @@ -0,0 +1,28 @@ +package com.loadcenter.common.domain; + +import lombok.Data; /** + * 节点状态 + */ +@Data +public class FlowInfo { + /** + * 上次读取吞吐量 + */ + private String lastReadThroughput; + /** + * 上次写入吞吐量 + */ + private String lastWriteThroughput; + /** + * 读取总吞吐量 + */ + private String readBytesHistory; + /** + * 实写字节 + */ + private String realWriteBytes; + /** + * 写入总吞吐量 + */ + private String writeBytesHistory; +} diff --git a/src/main/java/com/loadcenter/common/domain/InstancesInformation.java b/src/main/java/com/loadcenter/common/domain/InstancesInformation.java index 2009d14..008f795 100644 --- a/src/main/java/com/loadcenter/common/domain/InstancesInformation.java +++ b/src/main/java/com/loadcenter/common/domain/InstancesInformation.java @@ -14,17 +14,6 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @Data public class InstancesInformation { - /* - * log.info("查询第{" + count + "}个实例的ID:" + item.getInstanceId()); - log.info("名称:" + item.getInstanceName()); - log.info("状态:" + item.getStatus()); - log.info("公网IP:" + UserUtil.removeBrackets(item.getPublicIpAddress().getIpAddress().toString())); - log.info("私网IP:" + UserUtil.removeBrackets(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString())); - log.info("创建时间:" + item.getCreationTime()); - log.info("到期时间:" + item.getExpiredTime()); - log.info("是否可以回收:" + (item.getRecyclable() ? "是" : "否") + "\n\n"); - * */ - /* * 实例的ID * */ diff --git a/src/main/java/com/loadcenter/common/domain/JVMInfo.java b/src/main/java/com/loadcenter/common/domain/JVMInfo.java new file mode 100644 index 0000000..5952cca --- /dev/null +++ b/src/main/java/com/loadcenter/common/domain/JVMInfo.java @@ -0,0 +1,58 @@ +package com.loadcenter.common.domain; + +import lombok.Data; + +/** + * JVM使用信息 + */ +@Data +public class JVMInfo { + /** + * 文件描述(句柄) + */ + private String fileDescriptors; + /** + * 堆内存 + */ + private String heapCommit; + /** + * 堆初始化空间 + */ + private String heapInit; + /** + * 堆最大内存 + */ + private String heapMax; + /** + * 堆使用空间 + */ + private String heapUsed; + /** + * JAVA目录 + */ + private String jdkHome; + /** + * JDK版本 + */ + private String jdkVersion; + /** + * 非堆空间 + */ + private String noHeapCommit; + /** + * 非堆初始化空间 + */ + private String noHeapInit; + /** + * 非堆最大空间 + */ + private String noHeapMax; + /** + * 非堆使用空间 + */ + private String noHeapUsed; + /** + * 线程数量 + */ + private long threadCount; +} diff --git a/src/main/java/com/loadcenter/common/redis/service/RedisService.java b/src/main/java/com/loadcenter/common/redis/service/RedisService.java index 9f86582..7137fe7 100644 --- a/src/main/java/com/loadcenter/common/redis/service/RedisService.java +++ b/src/main/java/com/loadcenter/common/redis/service/RedisService.java @@ -187,9 +187,9 @@ public class RedisService { return setOperation; } - public BoundZSetOperations setCacheZSet(final String key,final T setValue,double score){ + public BoundZSetOperations setCacheZSet(final String key, final T setValue, double score) { BoundZSetOperations boundZSetOperations = redisTemplate.boundZSetOps(key); - boundZSetOperations.add(setValue,score); + boundZSetOperations.add(setValue, score); return boundZSetOperations; } @@ -218,6 +218,10 @@ public class RedisService { return memberScores; } + public Double incrementScore(final String key, final T value, final Double score) { + return redisTemplate.opsForZSet().incrementScore(key, value, score); + } + /** * 缓存Set * @@ -231,8 +235,8 @@ public class RedisService { setOperations.remove(setValue); } - public void deleteCacheSet(String key,T setValue){ - BoundSetOperations setOperation = redisTemplate.boundSetOps(key); + public void deleteCacheSet(String key, T setValue) { + BoundSetOperations setOperation = redisTemplate.boundSetOps(key); setOperation.remove(setValue); } @@ -248,12 +252,13 @@ public class RedisService { /** - * 删除缓存zset的元素 + * 删除缓存zset的元素 + * * @param key * @param value */ - public void deleteCacheZset(final String key ,final String value){ - redisTemplate.opsForZSet().remove(key,value); + public void deleteCacheZset(final String key, final String value) { + redisTemplate.opsForZSet().remove(key, value); } /** diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java deleted file mode 100644 index e5803fb..0000000 --- a/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.loadcenter.gateway.cache; - -import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; -import com.loadcenter.gateway.model.WorkGatewayNode; -import org.springframework.data.redis.core.ZSetOperations; -import org.springframework.stereotype.Service; - -import java.util.List; -import java.util.Set; - -/** - * @ClassName GatewayNodeScoreCache - * @Description 网关节点分数 - * @Author Can.J - * @Date 2024/4/19 14:47 - */ -@Service -public class GatewayNodeScoreCache extends GatewayCacheAbs { - private final static String gatewayNodeScoreCacheKey = "score"; - - @Override - public String getPre() { - return "gateway:node:"; - } - - public List get() { - Set> range = redisService.redisTemplate.opsForZSet().rangeWithScores(encode(gatewayNodeScoreCacheKey), 0, -1); - return range.stream() - .map(zSet -> WorkGatewayNode.builder() - .nodeId(zSet.getValue()).source(zSet.getScore()).build()) - .toList(); - } - - public Long getNodeNowNum() { - List workGatewayNodes = get(); - Long vehicleNowOnlineNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getSource).sum())); - - return vehicleNowOnlineNum; - } - - /** - * 获取节点最大上线数量 - * @return - */ - public Long getNodeMaxOnlineNum() { - List workGatewayNodes = get(); - return workGatewayNodes.size() * 80L; - } - -} diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java index 1661072..1d853ad 100644 --- a/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java +++ b/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java @@ -5,18 +5,19 @@ import com.loadcenter.gateway.model.WorkGatewayNode; import org.springframework.stereotype.Component; import java.util.ArrayList; +import java.util.List; import java.util.Map; /** - * @ClassName GatewayZSetNodeCache - * @Description 服务器节点和车辆连接数 - * @Author Can.J - * @Date 2024/4/19 9:00 + * @ClassName GatewayZSetNodeCache + * @Description 服务器节点和车辆连接数 + * @Author Can.J + * @Date 2024/4/19 9:00 */ @Component public class GatewayZSetNodeCache extends GatewayCacheAbs { - private final static String gatewayZSetCount ="count"; + private final static String gatewayZSetCount = "count"; @Override public String getPre() { @@ -24,44 +25,75 @@ public class GatewayZSetNodeCache extends GatewayCacheAbs { } /** - * 获取所有zset数据 + * 获取所有zset数据 + * * @return 负载节点集合 */ - public Map getzSet(){ + public Map getzSet() { return redisService.getCacheZSetScore(encode(gatewayZSetCount)); } /** - * 获取所有zset数据 + * 获取所有zset数据 + * * @return 负载节点集合 */ - public ArrayList get(){ - ArrayList nodeList = new ArrayList<>(); + public ArrayList get() { + ArrayList nodeIdList = new ArrayList<>(); Map map = redisService.getCacheZSetScore(encode(gatewayZSetCount)); for (Map.Entry entry : map.entrySet()) { WorkGatewayNode workGatewayNode = new WorkGatewayNode(); workGatewayNode.setNodeId(entry.getKey().toString()); - workGatewayNode.setSource(entry.getValue()); - nodeList.add(workGatewayNode); + workGatewayNode.setWeight(entry.getValue().intValue()); + nodeIdList.add(workGatewayNode); } - return nodeList; + return nodeIdList; } + /** + * 当前连接数 + */ + public Long getNodeNowNum() { + List workGatewayNodes = get(); + Long vehicleNowOnlineNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getWeight).sum())); + + return vehicleNowOnlineNum; + } /** - * 修改服务器的在线车辆 + * 获取节点最大上线数量 + * @return + */ + public Long getNodeMaxOnlineNum() { + List nodeIdList = get(); + return nodeIdList.size() * 80L; + } + + /** + * 修改服务器的在线车辆 + * * @param nodeId * @param onlineVehicle */ - public void put(String nodeId,Integer onlineVehicle){ - redisService.setCacheZSet(encode(gatewayZSetCount),nodeId,onlineVehicle); + public void put(String nodeId, Integer onlineVehicle) { + redisService.setCacheZSet(encode(gatewayZSetCount), nodeId, onlineVehicle); } /** - * 删除服务器的时候删除zset服务器列表,防止重新添加车辆 + * 删除服务器的时候删除zset服务器列表,防止重新添加车辆 + * * @param nodeId */ - public void remove(String nodeId){ - redisService.deleteCacheZset(encode(gatewayZSetCount),nodeId); + public void remove(String nodeId) { + redisService.deleteCacheZset(encode(gatewayZSetCount), nodeId); + } + + /** + * 序列自增 + * @param ip + * @param score + */ + public void increment(String ip, Double score) { + redisService.incrementScore(encode(gatewayZSetCount), ip, score); } } diff --git a/src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java b/src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java deleted file mode 100644 index 9a18e47..0000000 --- a/src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.loadcenter.gateway.cache; - -import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; -import org.springframework.stereotype.Component; - -/** - * @ClassName VehicleOnlineCache - * @Description 描述 - * @Author Can.J - * @Date 2024/4/18 22:02 - */ -@Component -public class VehicleOnlineCache extends GatewayCacheAbs { - - private final static String vehicleOnlineKey ="vehicle_online"; - @Override - public String getPre() { - return "vehicle:"; - } - - /** - * 记录车辆上线业务 - * @param vin 车辆VIN - * @param nodeId 网关节点id - */ - public void recordVehicleOnline(String vin,String nodeId){ - redisService.setCacheObject(encode(vehicleOnlineKey+vin),nodeId); - } - - /** - * 获取车辆上线的网关节点id - * @param vin 车辆VIN - * @return 网关节点id - */ - public String getOnlineGatewayNode(String vin){ - return redisService.getCacheObject(encode(vehicleOnlineKey+vin)); - } - - /** - * 移除车辆上线业务信息 - * @param vin 车辆VIN - */ - public void removeVehicleOnline(String vin){ - redisService.deleteObject(encode(vehicleOnlineKey +vin)); - } - -} diff --git a/src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java b/src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java index 19614da..76e4ced 100644 --- a/src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java +++ b/src/main/java/com/loadcenter/gateway/model/WorkGatewayNode.java @@ -16,7 +16,7 @@ public class WorkGatewayNode { private String nodeId; /** - * 分数 + * 权重 */ private int weight; diff --git a/src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java b/src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java deleted file mode 100644 index e6ecf89..0000000 --- a/src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.loadcenter.gateway.model; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -@Builder -public class WorkGatewayNodeSource { - /** - * 节点id - */ - private String nodeId; - - /** - * 权重 - */ - private int weight; -} diff --git a/src/main/java/com/loadcenter/rabbitMq/Consumer.java b/src/main/java/com/loadcenter/rabbitMq/Consumer.java new file mode 100644 index 0000000..0e59b30 --- /dev/null +++ b/src/main/java/com/loadcenter/rabbitMq/Consumer.java @@ -0,0 +1,117 @@ +package com.loadcenter.rabbitMq; + +import com.loadcenter.common.aliyun.service.AliYunEcsService; +import com.loadcenter.common.domain.resp.Result; +import com.loadcenter.common.redis.service.RedisService; +import com.loadcenter.gateway.cache.GatewayNodeCache; +import com.loadcenter.gateway.cache.GatewayNodeSetVinCache; +import com.loadcenter.gateway.model.GatewayNodeInfo; +import com.loadcenter.service.GatewayLoadService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; + +/** + * @ClassName Consumer + * @Description 消费消息, + * @Author Can.J + * @Date 2024/4/21 16:14 + */ +@RabbitListener(queues = "queue") +@Component +@Slf4j +public class Consumer { + + //redisService + @Autowired + private RedisService redisService; + + @Autowired + private RestTemplate restTemplate; + + @Autowired + private GatewayLoadService gatewayLoadService; + + @Autowired + private GatewayNodeCache gatewayNodeCache; + + @Autowired + private GatewayNodeSetVinCache gatewayNodeSetVinCache; + + @Autowired + private AliYunEcsService aliYunEcsService; + + @RabbitHandler + public void Handler(GatewayNodeInfo gatewayNodeInfo, Channel channel, Message message) throws IOException { + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + + String messageId = message.getMessageProperties().getMessageId(); + log.info("消息ID:[{}]", messageId); + log.info("标签:[{}]", deliveryTag); + + //检查消息是否首次处理 + if (!redisService.hasKey("idempotent:" + messageId)) { + //首次处理,存储传递标签 + redisService.setCacheObject("idempotent:" + messageId, deliveryTag + ""); + } + + //1 添加成功新数据 0已有重复值,不允许再添加 + Long add = redisService.redisTemplate.opsForSet().add("set" + messageId, "set" + messageId); + + try { + //消费的业务逻辑 + //检查是否重复消费 并在redis中做标记 + if (add == 1) { + log.info("确认消费"); + + Set set = redisService.getCacheSet("gateway:node:cars" + gatewayNodeInfo.getPublicAddress()); + + ArrayList allVin = new ArrayList<>(set); + + String url = "http://127.0.0.1:81/vehicle/instance/client/inits"; + + Result result = restTemplate.postForObject(url, allVin, Result.class); + + Integer i = (Integer) result.getData(); + + if (i > 0) { + log.info("重新上线成功"); + gatewayNodeCache.remove(gatewayNodeInfo.getPublicAddress()); + } + + //释放ECS实例 + aliYunEcsService.releaseInstances(gatewayNodeInfo.getNodeId()); + + redisService.setCacheObject("idempotent:" + messageId, messageId); + + channel.basicAck(deliveryTag, false); + } else { + //重复消费,拒绝处理 + log.info("重复消费"); + channel.basicReject(deliveryTag, false); + } + } catch (Exception e) { + //异常处理逻辑,根据重试次数决定消息处理方式 + String s = redisService.getCacheObject("idempotent:" + messageId); + + long l = Long.parseLong(s); + + if (deliveryTag == (l + 2)) { + //重试三次后,任然失败,退回消息 + log.info("已重试三次,无法消费"); + channel.basicNack(deliveryTag,false,false); + } + throw new RuntimeException(e.getMessage()); + } + } +} + diff --git a/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java b/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java index 0848253..100a210 100644 --- a/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java +++ b/src/main/java/com/loadcenter/service/impl/GatewayLoadServiceImpl.java @@ -2,26 +2,30 @@ package com.loadcenter.service.impl; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.loadcenter.common.aliyun.config.RabbitMqConfig; import com.loadcenter.common.aliyun.service.AliYunEcsService; import com.loadcenter.common.domain.InstancesInformation; import com.loadcenter.gateway.cache.*; import com.loadcenter.gateway.model.GatewayNodeInfo; import com.loadcenter.gateway.model.WorkGatewayNode; -import com.loadcenter.gateway.model.WorkGatewayNodeSource; import com.loadcenter.service.GatewayLoadService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; /** * @ClassName GatewayLoadServiceImpl @@ -55,11 +59,15 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { private final GatewayZSetNodeCache gatewayZSetNodeCache; - private final GatewayNodeScoreCache gatewayNodeScoreCache; - @Autowired private AliYunEcsService aliYunEcsService; + /** + * mq模板 + */ + @Autowired + private RabbitTemplate rabbitTemplate; + /** * 负载节点 * @@ -131,29 +139,31 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { @Override public void refresh() { - Long nodeMaxNum = 80L; - List workGatewayNodes = gatewayNodeScoreCache.get(); - //上线最大数量 - Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum(); - //当前连接数 - Long vehicleNowOnlineNum = gatewayNodeScoreCache.getNodeNowNum(); - //空余连接数 - Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleNowOnlineNum; - List loadNodeList = new ArrayList<>(); - List workGatewayNodeSources = workGatewayNodes.stream() - .map(workGatewayNode -> - WorkGatewayNodeSource.builder() - .nodeId(workGatewayNode.getNodeId()) - .weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (nodeMaxNum - workGatewayNode.getWeight())))) - .build()) - .toList(); - // 计算节点列表中所有节点的权重之和 - long count = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum(); + List nodeIdList = gatewayZSetNodeCache.get(); +// //上线最大数量 +// Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum(); +// //当前连接数 +// Long vehicleNowOnlineNum = gatewayNodeScoreCache.getNodeNowNum(); +// //空余连接数 +// Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleNowOnlineNum; +// +// +// List workGatewayNodeSources = nodeIdList.stream() +// .map(workGatewayNode -> +// WorkGatewayNodeSource.builder() +// .nodeId(workGatewayNode.getNodeId()) +// .weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (vehicleMaxOnlineNum - workGatewayNode.getWeight())))) +// .build()) +// .toList(); +// // 计算节点列表中所有节点的权重之和 +// long count = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum(); // 如果权重之和小于100,则对节点列表按照权重进行降序排序,并将剩余权重平均分配给前几个节点,直到权重之和等于100 + + long count =nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum(); if (count < 100) { // 对节点按权重降序排序 - List list = workGatewayNodeSources.stream() + List list = nodeIdList.stream() .sorted((o1, o2) -> o2.getWeight() - o1.getWeight()) .toList(); // 打印排序后的节点列表 @@ -162,24 +172,24 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { // 将剩余权重分配给最低权重的节点,直到总和达到100 int countWeight = 0; for (long i = count; i < 100; i++) { - WorkGatewayNodeSource workGatewayNodeSource = list.get(countWeight++ % list.size()); - workGatewayNodeSource.setWeight(workGatewayNodeSource.getWeight() + 1); + WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size()); + workGatewayNode.setWeight(workGatewayNode.getWeight() + 1); } } // 当所有节点权重为0时,跳出循环 whFor: while (true) { // 遍历节点列表,将权重大于0的节点ID添加到loadNodeList中,并将节点权重减1 - for (WorkGatewayNodeSource workGatewayNodeSource : workGatewayNodeSources) { - int weight = workGatewayNodeSource.getWeight(); + for (WorkGatewayNode workGatewayNode : nodeIdList) { + int weight = workGatewayNode.getWeight(); if (weight > 0) { loadNodeList.add( - workGatewayNodeSource.getNodeId() + workGatewayNode.getNodeId() ); - workGatewayNodeSource.setWeight(weight - 1); + workGatewayNode.setWeight(weight - 1); } } - int sum = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum(); + int sum = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum(); // 如果权重之和小于等于0,跳出循环 if (sum <= 0) { break whFor; @@ -196,11 +206,11 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { @Override public void dynamicEcs() throws Exception { //上线最大数量 - Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum(); + Long vehicleMaxOnlineNum = gatewayZSetNodeCache.getNodeMaxOnlineNum(); //当前连接数 - Long vehicleOnlineNowNum = gatewayNodeScoreCache.getNodeNowNum(); + Long vehicleOnlineNowNum = gatewayZSetNodeCache.getNodeNowNum(); - BigDecimal loadRate = new BigDecimal(vehicleMaxOnlineNum).divide(new BigDecimal(vehicleOnlineNowNum), 0, BigDecimal.ROUND_HALF_UP); + BigDecimal loadRate = new BigDecimal(vehicleOnlineNowNum).divide(new BigDecimal(vehicleMaxOnlineNum), 2, RoundingMode.HALF_UP).multiply(BigDecimal.valueOf(100)); log.info("负载率:[{}]", loadRate); ArrayList nodeList = gatewayZSetNodeCache.get(); @@ -225,9 +235,11 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { gatewayNodeCache.put(gatewayNodeInfo); gatewayZSetNodeCache.put(instancesInformation.getPublicIpAddress(), 0); + + this.refresh(); log.info("实例id和公网ip存入redis"); - } else if (loadRate.longValue() < 20L) { + } else if (loadRate.longValue() < 15L) { if (nodeList.size() > 2) { log.info("负载率:[{}]", loadRate); @@ -253,7 +265,13 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(minConnectionNode.getNodeId()); log.info("删除节点:[{}]",gatewayNodeInfo.getNodeId()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE,RabbitMqConfig.KEY,gatewayNodeInfo,message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + },new CorrelationData(UUID.randomUUID().toString())); + }else{ + log.info("负载过低,节点数小于2,不进行缩容"); } } } diff --git a/src/main/java/com/loadcenter/service/impl/GatewayVehicleServiceImpl.java b/src/main/java/com/loadcenter/service/impl/GatewayVehicleServiceImpl.java index d930af7..037b0fe 100644 --- a/src/main/java/com/loadcenter/service/impl/GatewayVehicleServiceImpl.java +++ b/src/main/java/com/loadcenter/service/impl/GatewayVehicleServiceImpl.java @@ -3,6 +3,8 @@ package com.loadcenter.service.impl; import com.loadcenter.common.domain.resp.Result; import com.loadcenter.gateway.cache.GatewayNodeSetVinCache; import com.loadcenter.gateway.cache.GatewayVehicleNode; +import com.loadcenter.gateway.cache.GatewayZSetNodeCache; +import com.loadcenter.service.GatewayLoadService; import com.loadcenter.service.GatewayVehicleService; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -20,8 +22,10 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService { private final GatewayVehicleNode gatewayVehicleNode; - @Autowired - private RestTemplate restTemplate; + private final GatewayLoadService gatewayLoadService; + + private final GatewayZSetNodeCache gatewayZSetNodeCache; + /** * 车辆上线 @@ -30,14 +34,15 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService { */ @Override public Result onLine(String vin) { - String URL ="http://127.0.0.1:9010/gateway/load/node"; + String ip = gatewayLoadService.loadNode(); - Result data = restTemplate.getForObject(URL, Result.class); + gatewayVehicleNode.put(vin,ip); - gatewayVehicleNode.put(vin,data.getData().toString()); - gatewayNodeSetVinCache.put(data.getData().toString(),vin); + gatewayNodeSetVinCache.put(ip,vin); - return Result.success(data.getData().toString()); + gatewayZSetNodeCache.increment(ip,1.0); + + return Result.success(ip); } /** @@ -47,7 +52,9 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService { @Override public void downLine(String vin) { String nodeId = gatewayVehicleNode.get(vin); + gatewayNodeSetVinCache.remove(nodeId,vin); + gatewayVehicleNode.remove(vin); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8d71811..ce0cc55 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,18 @@ spring: port: 6379 password: jc@123 + rabbitmq: + username: guest + password: guest + virtualHost: / + port: 5672 + host: 111.229.33.194 + listener: + simple: + prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条 + publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) + publisher-returns: true #确认消息已发送到队列(Queue) + config: aliyun: access-key-id: LTAI5tK42WuqUEhmHLitVHt1