4.21
master
JangCan 2024-04-21 16:49:35 +08:00
parent 8c5049762a
commit c080668c5e
17 changed files with 485 additions and 197 deletions

View File

@ -31,6 +31,12 @@
</dependencyManagement>
<dependencies>
<!--MQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- SpringBoot Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -102,6 +108,7 @@
<version>0.0.3</version>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -0,0 +1,86 @@
package com.loadcenter.common.aliyun.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
public static final Logger logger= LoggerFactory.getLogger(RabbitMqConfig.class);
private RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
public static final String QUEUE="queue";
public static final String EXCHANGE="exchange";
public static final String KEY="Key";
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate=rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
rabbitTemplate();
return rabbitTemplate;
}
public void rabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Bean
public Queue queue(){
return new Queue(QUEUE,true);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with(KEY);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack){
logger.info("{}消息到达交换机",correlationData.getId());
}else{
logger.error("{}消息丢失",correlationData.getId());
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
logger.error("{}消息未到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
}
}

View File

@ -0,0 +1,18 @@
package com.loadcenter.common.aliyun.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
/**
* @ClassName RestClientConfig
* @Description RestClientConfig
* @Author Can.J
* @Date 2024/4/21 16:18
*/
@Configuration
public class RestClientConfig {
@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}

View File

@ -0,0 +1,30 @@
package com.loadcenter.common.domain;
import lombok.Data;
/**
* CPU使
*/
@Data
public class CPUInfo {
/**
* CPU
*/
private long cpuNum;
/**
* 使
*/
private String cSys;
/**
*
*/
private String idle;
/**
* I/O
*/
private String iowait;
/**
* 使
*/
private String user;
}

View File

@ -0,0 +1,28 @@
package com.loadcenter.common.domain;
import lombok.Data; /**
*
*/
@Data
public class FlowInfo {
/**
*
*/
private String lastReadThroughput;
/**
*
*/
private String lastWriteThroughput;
/**
*
*/
private String readBytesHistory;
/**
*
*/
private String realWriteBytes;
/**
*
*/
private String writeBytesHistory;
}

View File

@ -14,17 +14,6 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@Data
public class InstancesInformation {
/*
* log.info("查询第{" + count + "}个实例的ID:" + item.getInstanceId());
log.info("名称:" + item.getInstanceName());
log.info("状态:" + item.getStatus());
log.info("公网IP:" + UserUtil.removeBrackets(item.getPublicIpAddress().getIpAddress().toString()));
log.info("私网IP:" + UserUtil.removeBrackets(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString()));
log.info("创建时间:" + item.getCreationTime());
log.info("到期时间:" + item.getExpiredTime());
log.info("是否可以回收:" + (item.getRecyclable() ? "是" : "否") + "\n\n");
* */
/*
* ID
* */

View File

@ -0,0 +1,58 @@
package com.loadcenter.common.domain;
import lombok.Data;
/**
* JVM使
*/
@Data
public class JVMInfo {
/**
* ()
*/
private String fileDescriptors;
/**
*
*/
private String heapCommit;
/**
*
*/
private String heapInit;
/**
*
*/
private String heapMax;
/**
* 使
*/
private String heapUsed;
/**
* JAVA
*/
private String jdkHome;
/**
* JDK
*/
private String jdkVersion;
/**
*
*/
private String noHeapCommit;
/**
*
*/
private String noHeapInit;
/**
*
*/
private String noHeapMax;
/**
* 使
*/
private String noHeapUsed;
/**
* 线
*/
private long threadCount;
}

View File

@ -187,9 +187,9 @@ public class RedisService {
return setOperation;
}
public <T> BoundZSetOperations<String ,T> setCacheZSet(final String key,final T setValue,double score){
public <T> BoundZSetOperations<String, T> setCacheZSet(final String key, final T setValue, double score) {
BoundZSetOperations boundZSetOperations = redisTemplate.boundZSetOps(key);
boundZSetOperations.add(setValue,score);
boundZSetOperations.add(setValue, score);
return boundZSetOperations;
}
@ -218,6 +218,10 @@ public class RedisService {
return memberScores;
}
public <T> Double incrementScore(final String key, final T value, final Double score) {
return redisTemplate.opsForZSet().incrementScore(key, value, score);
}
/**
* Set
*
@ -231,8 +235,8 @@ public class RedisService {
setOperations.remove(setValue);
}
public <T> void deleteCacheSet(String key,T setValue){
BoundSetOperations<String,T> setOperation = redisTemplate.boundSetOps(key);
public <T> void deleteCacheSet(String key, T setValue) {
BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key);
setOperation.remove(setValue);
}
@ -248,12 +252,13 @@ public class RedisService {
/**
* zset
* zset
*
* @param key
* @param value
*/
public void deleteCacheZset(final String key ,final String value){
redisTemplate.opsForZSet().remove(key,value);
public void deleteCacheZset(final String key, final String value) {
redisTemplate.opsForZSet().remove(key, value);
}
/**

View File

@ -1,50 +0,0 @@
package com.loadcenter.gateway.cache;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import com.loadcenter.gateway.model.WorkGatewayNode;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
/**
* @ClassName GatewayNodeScoreCache
* @Description
* @Author Can.J
* @Date 2024/4/19 14:47
*/
@Service
public class GatewayNodeScoreCache extends GatewayCacheAbs<String> {
private final static String gatewayNodeScoreCacheKey = "score";
@Override
public String getPre() {
return "gateway:node:";
}
public List<WorkGatewayNode> get() {
Set<ZSetOperations.TypedTuple<String>> range = redisService.redisTemplate.opsForZSet().rangeWithScores(encode(gatewayNodeScoreCacheKey), 0, -1);
return range.stream()
.map(zSet -> WorkGatewayNode.builder()
.nodeId(zSet.getValue()).source(zSet.getScore()).build())
.toList();
}
public Long getNodeNowNum() {
List<WorkGatewayNode> workGatewayNodes = get();
Long vehicleNowOnlineNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getSource).sum()));
return vehicleNowOnlineNum;
}
/**
* 线
* @return
*/
public Long getNodeMaxOnlineNum() {
List<WorkGatewayNode> workGatewayNodes = get();
return workGatewayNodes.size() * 80L;
}
}

View File

@ -5,18 +5,19 @@ import com.loadcenter.gateway.model.WorkGatewayNode;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @ClassName GatewayZSetNodeCache
* @Description
* @Author Can.J
* @Date 2024/4/19 9:00
* @ClassName GatewayZSetNodeCache
* @Description
* @Author Can.J
* @Date 2024/4/19 9:00
*/
@Component
public class GatewayZSetNodeCache extends GatewayCacheAbs<String> {
private final static String gatewayZSetCount ="count";
private final static String gatewayZSetCount = "count";
@Override
public String getPre() {
@ -24,44 +25,75 @@ public class GatewayZSetNodeCache extends GatewayCacheAbs<String> {
}
/**
* zset
* zset
*
* @return
*/
public Map<Object,Double> getzSet(){
public Map<Object, Double> getzSet() {
return redisService.getCacheZSetScore(encode(gatewayZSetCount));
}
/**
* zset
* zset
*
* @return
*/
public ArrayList<WorkGatewayNode> get(){
ArrayList<WorkGatewayNode> nodeList = new ArrayList<>();
public ArrayList<WorkGatewayNode> get() {
ArrayList<WorkGatewayNode> nodeIdList = new ArrayList<>();
Map<Object, Double> map = redisService.getCacheZSetScore(encode(gatewayZSetCount));
for (Map.Entry<Object, Double> entry : map.entrySet()) {
WorkGatewayNode workGatewayNode = new WorkGatewayNode();
workGatewayNode.setNodeId(entry.getKey().toString());
workGatewayNode.setSource(entry.getValue());
nodeList.add(workGatewayNode);
workGatewayNode.setWeight(entry.getValue().intValue());
nodeIdList.add(workGatewayNode);
}
return nodeList;
return nodeIdList;
}
/**
*
*/
public Long getNodeNowNum() {
List<WorkGatewayNode> workGatewayNodes = get();
Long vehicleNowOnlineNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getWeight).sum()));
return vehicleNowOnlineNum;
}
/**
* 线
* 线
* @return
*/
public Long getNodeMaxOnlineNum() {
List<WorkGatewayNode> nodeIdList = get();
return nodeIdList.size() * 80L;
}
/**
* 线
*
* @param nodeId
* @param onlineVehicle
*/
public void put(String nodeId,Integer onlineVehicle){
redisService.setCacheZSet(encode(gatewayZSetCount),nodeId,onlineVehicle);
public void put(String nodeId, Integer onlineVehicle) {
redisService.setCacheZSet(encode(gatewayZSetCount), nodeId, onlineVehicle);
}
/**
* zset
* zset
*
* @param nodeId
*/
public void remove(String nodeId){
redisService.deleteCacheZset(encode(gatewayZSetCount),nodeId);
public void remove(String nodeId) {
redisService.deleteCacheZset(encode(gatewayZSetCount), nodeId);
}
/**
*
* @param ip
* @param score
*/
public void increment(String ip, Double score) {
redisService.incrementScore(encode(gatewayZSetCount), ip, score);
}
}

View File

@ -1,47 +0,0 @@
package com.loadcenter.gateway.cache;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import org.springframework.stereotype.Component;
/**
* @ClassName VehicleOnlineCache
* @Description
* @Author Can.J
* @Date 2024/4/18 22:02
*/
@Component
public class VehicleOnlineCache extends GatewayCacheAbs<String> {
private final static String vehicleOnlineKey ="vehicle_online";
@Override
public String getPre() {
return "vehicle:";
}
/**
* 线
* @param vin VIN
* @param nodeId id
*/
public void recordVehicleOnline(String vin,String nodeId){
redisService.setCacheObject(encode(vehicleOnlineKey+vin),nodeId);
}
/**
* 线id
* @param vin VIN
* @return id
*/
public String getOnlineGatewayNode(String vin){
return redisService.getCacheObject(encode(vehicleOnlineKey+vin));
}
/**
* 线
* @param vin VIN
*/
public void removeVehicleOnline(String vin){
redisService.deleteObject(encode(vehicleOnlineKey +vin));
}
}

View File

@ -16,7 +16,7 @@ public class WorkGatewayNode {
private String nodeId;
/**
*
*
*/
private int weight;

View File

@ -1,22 +0,0 @@
package com.loadcenter.gateway.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class WorkGatewayNodeSource {
/**
* id
*/
private String nodeId;
/**
*
*/
private int weight;
}

View File

@ -0,0 +1,117 @@
package com.loadcenter.rabbitMq;
import com.loadcenter.common.aliyun.service.AliYunEcsService;
import com.loadcenter.common.domain.resp.Result;
import com.loadcenter.common.redis.service.RedisService;
import com.loadcenter.gateway.cache.GatewayNodeCache;
import com.loadcenter.gateway.cache.GatewayNodeSetVinCache;
import com.loadcenter.gateway.model.GatewayNodeInfo;
import com.loadcenter.service.GatewayLoadService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
/**
* @ClassName Consumer
* @Description
* @Author Can.J
* @Date 2024/4/21 16:14
*/
@RabbitListener(queues = "queue")
@Component
@Slf4j
public class Consumer {
//redisService
@Autowired
private RedisService redisService;
@Autowired
private RestTemplate restTemplate;
@Autowired
private GatewayLoadService gatewayLoadService;
@Autowired
private GatewayNodeCache gatewayNodeCache;
@Autowired
private GatewayNodeSetVinCache gatewayNodeSetVinCache;
@Autowired
private AliYunEcsService aliYunEcsService;
@RabbitHandler
public void Handler(GatewayNodeInfo gatewayNodeInfo, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
log.info("消息ID:[{}]", messageId);
log.info("标签:[{}]", deliveryTag);
//检查消息是否首次处理
if (!redisService.hasKey("idempotent:" + messageId)) {
//首次处理,存储传递标签
redisService.setCacheObject("idempotent:" + messageId, deliveryTag + "");
}
//1 添加成功新数据 0已有重复值不允许再添加
Long add = redisService.redisTemplate.opsForSet().add("set" + messageId, "set" + messageId);
try {
//消费的业务逻辑
//检查是否重复消费 并在redis中做标记
if (add == 1) {
log.info("确认消费");
Set<String> set = redisService.getCacheSet("gateway:node:cars" + gatewayNodeInfo.getPublicAddress());
ArrayList<String> allVin = new ArrayList<>(set);
String url = "http://127.0.0.1:81/vehicle/instance/client/inits";
Result result = restTemplate.postForObject(url, allVin, Result.class);
Integer i = (Integer) result.getData();
if (i > 0) {
log.info("重新上线成功");
gatewayNodeCache.remove(gatewayNodeInfo.getPublicAddress());
}
//释放ECS实例
aliYunEcsService.releaseInstances(gatewayNodeInfo.getNodeId());
redisService.setCacheObject("idempotent:" + messageId, messageId);
channel.basicAck(deliveryTag, false);
} else {
//重复消费,拒绝处理
log.info("重复消费");
channel.basicReject(deliveryTag, false);
}
} catch (Exception e) {
//异常处理逻辑,根据重试次数决定消息处理方式
String s = redisService.getCacheObject("idempotent:" + messageId);
long l = Long.parseLong(s);
if (deliveryTag == (l + 2)) {
//重试三次后,任然失败,退回消息
log.info("已重试三次,无法消费");
channel.basicNack(deliveryTag,false,false);
}
throw new RuntimeException(e.getMessage());
}
}
}

View File

@ -2,26 +2,30 @@ package com.loadcenter.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.loadcenter.common.aliyun.config.RabbitMqConfig;
import com.loadcenter.common.aliyun.service.AliYunEcsService;
import com.loadcenter.common.domain.InstancesInformation;
import com.loadcenter.gateway.cache.*;
import com.loadcenter.gateway.model.GatewayNodeInfo;
import com.loadcenter.gateway.model.WorkGatewayNode;
import com.loadcenter.gateway.model.WorkGatewayNodeSource;
import com.loadcenter.service.GatewayLoadService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* @ClassName GatewayLoadServiceImpl
@ -55,11 +59,15 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
private final GatewayZSetNodeCache gatewayZSetNodeCache;
private final GatewayNodeScoreCache gatewayNodeScoreCache;
@Autowired
private AliYunEcsService aliYunEcsService;
/**
* mq
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
*
@ -131,29 +139,31 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
@Override
public void refresh() {
Long nodeMaxNum = 80L;
List<WorkGatewayNode> workGatewayNodes = gatewayNodeScoreCache.get();
//上线最大数量
Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum();
//当前连接数
Long vehicleNowOnlineNum = gatewayNodeScoreCache.getNodeNowNum();
//空余连接数
Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleNowOnlineNum;
List<String> loadNodeList = new ArrayList<>();
List<WorkGatewayNodeSource> workGatewayNodeSources = workGatewayNodes.stream()
.map(workGatewayNode ->
WorkGatewayNodeSource.builder()
.nodeId(workGatewayNode.getNodeId())
.weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (nodeMaxNum - workGatewayNode.getWeight()))))
.build())
.toList();
// 计算节点列表中所有节点的权重之和
long count = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum();
List<WorkGatewayNode> nodeIdList = gatewayZSetNodeCache.get();
// //上线最大数量
// Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum();
// //当前连接数
// Long vehicleNowOnlineNum = gatewayNodeScoreCache.getNodeNowNum();
// //空余连接数
// Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleNowOnlineNum;
//
//
// List<WorkGatewayNodeSource> workGatewayNodeSources = nodeIdList.stream()
// .map(workGatewayNode ->
// WorkGatewayNodeSource.builder()
// .nodeId(workGatewayNode.getNodeId())
// .weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (vehicleMaxOnlineNum - workGatewayNode.getWeight()))))
// .build())
// .toList();
// // 计算节点列表中所有节点的权重之和
// long count = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum();
// 如果权重之和小于100则对节点列表按照权重进行降序排序并将剩余权重平均分配给前几个节点直到权重之和等于100
long count =nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum();
if (count < 100) {
// 对节点按权重降序排序
List<WorkGatewayNodeSource> list = workGatewayNodeSources.stream()
List<WorkGatewayNode> list = nodeIdList.stream()
.sorted((o1, o2) -> o2.getWeight() - o1.getWeight())
.toList();
// 打印排序后的节点列表
@ -162,24 +172,24 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
// 将剩余权重分配给最低权重的节点直到总和达到100
int countWeight = 0;
for (long i = count; i < 100; i++) {
WorkGatewayNodeSource workGatewayNodeSource = list.get(countWeight++ % list.size());
workGatewayNodeSource.setWeight(workGatewayNodeSource.getWeight() + 1);
WorkGatewayNode workGatewayNode = list.get(countWeight++ % list.size());
workGatewayNode.setWeight(workGatewayNode.getWeight() + 1);
}
}
// 当所有节点权重为0时跳出循环
whFor:
while (true) {
// 遍历节点列表将权重大于0的节点ID添加到loadNodeList中并将节点权重减1
for (WorkGatewayNodeSource workGatewayNodeSource : workGatewayNodeSources) {
int weight = workGatewayNodeSource.getWeight();
for (WorkGatewayNode workGatewayNode : nodeIdList) {
int weight = workGatewayNode.getWeight();
if (weight > 0) {
loadNodeList.add(
workGatewayNodeSource.getNodeId()
workGatewayNode.getNodeId()
);
workGatewayNodeSource.setWeight(weight - 1);
workGatewayNode.setWeight(weight - 1);
}
}
int sum = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum();
int sum = nodeIdList.stream().mapToInt(WorkGatewayNode::getWeight).sum();
// 如果权重之和小于等于0跳出循环
if (sum <= 0) {
break whFor;
@ -196,11 +206,11 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
@Override
public void dynamicEcs() throws Exception {
//上线最大数量
Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum();
Long vehicleMaxOnlineNum = gatewayZSetNodeCache.getNodeMaxOnlineNum();
//当前连接数
Long vehicleOnlineNowNum = gatewayNodeScoreCache.getNodeNowNum();
Long vehicleOnlineNowNum = gatewayZSetNodeCache.getNodeNowNum();
BigDecimal loadRate = new BigDecimal(vehicleMaxOnlineNum).divide(new BigDecimal(vehicleOnlineNowNum), 0, BigDecimal.ROUND_HALF_UP);
BigDecimal loadRate = new BigDecimal(vehicleOnlineNowNum).divide(new BigDecimal(vehicleMaxOnlineNum), 2, RoundingMode.HALF_UP).multiply(BigDecimal.valueOf(100));
log.info("负载率:[{}]", loadRate);
ArrayList<WorkGatewayNode> nodeList = gatewayZSetNodeCache.get();
@ -225,9 +235,11 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
gatewayNodeCache.put(gatewayNodeInfo);
gatewayZSetNodeCache.put(instancesInformation.getPublicIpAddress(), 0);
this.refresh();
log.info("实例id和公网ip存入redis");
} else if (loadRate.longValue() < 20L) {
} else if (loadRate.longValue() < 15L) {
if (nodeList.size() > 2) {
log.info("负载率:[{}]", loadRate);
@ -253,7 +265,13 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(minConnectionNode.getNodeId());
log.info("删除节点:[{}]",gatewayNodeInfo.getNodeId());
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE,RabbitMqConfig.KEY,gatewayNodeInfo,message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
},new CorrelationData(UUID.randomUUID().toString()));
}else{
log.info("负载过低,节点数小于2,不进行缩容");
}
}
}

View File

@ -3,6 +3,8 @@ package com.loadcenter.service.impl;
import com.loadcenter.common.domain.resp.Result;
import com.loadcenter.gateway.cache.GatewayNodeSetVinCache;
import com.loadcenter.gateway.cache.GatewayVehicleNode;
import com.loadcenter.gateway.cache.GatewayZSetNodeCache;
import com.loadcenter.service.GatewayLoadService;
import com.loadcenter.service.GatewayVehicleService;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
@ -20,8 +22,10 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService {
private final GatewayVehicleNode gatewayVehicleNode;
@Autowired
private RestTemplate restTemplate;
private final GatewayLoadService gatewayLoadService;
private final GatewayZSetNodeCache gatewayZSetNodeCache;
/**
* 线
@ -30,14 +34,15 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService {
*/
@Override
public Result<String> onLine(String vin) {
String URL ="http://127.0.0.1:9010/gateway/load/node";
String ip = gatewayLoadService.loadNode();
Result data = restTemplate.getForObject(URL, Result.class);
gatewayVehicleNode.put(vin,ip);
gatewayVehicleNode.put(vin,data.getData().toString());
gatewayNodeSetVinCache.put(data.getData().toString(),vin);
gatewayNodeSetVinCache.put(ip,vin);
return Result.success(data.getData().toString());
gatewayZSetNodeCache.increment(ip,1.0);
return Result.success(ip);
}
/**
@ -47,7 +52,9 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService {
@Override
public void downLine(String vin) {
String nodeId = gatewayVehicleNode.get(vin);
gatewayNodeSetVinCache.remove(nodeId,vin);
gatewayVehicleNode.remove(vin);
}
}

View File

@ -6,6 +6,18 @@ spring:
port: 6379
password: jc@123
rabbitmq:
username: guest
password: guest
virtualHost: /
port: 5672
host: 111.229.33.194
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
config:
aliyun:
access-key-id: LTAI5tK42WuqUEhmHLitVHt1