master
黄大举 2024-04-20 10:36:53 +08:00
parent 6776e50f8a
commit 85c99c55bd
7 changed files with 65 additions and 22 deletions

View File

@ -48,7 +48,7 @@ public class GatewayNodeCache extends GatewayCacheAbs<String> {
*/ */
public void remove(String nodeId){ public void remove(String nodeId){
redisService.deleteObject(encode(nodeId)); redisService.deleteObject(encode(gatewayNodeCache)+nodeId);
} }

View File

@ -91,6 +91,10 @@ public class GatewayZSetNodeCache extends GatewayCacheAbs<String> {
} }
public Map<Object, Double> get2(String ip) {
return redisService.getCacheZSetScore(encode(gatewayZSetCount) + ip);
}
} }

View File

@ -47,10 +47,6 @@ public class GatewayController {
gatewayLoadService.dynamic(); gatewayLoadService.dynamic();
} }
@Scheduled(cron = "0/30 * * * * ?")
public void refreshLoad() throws Exception {
gatewayLoadService.refreshLoad();
}

View File

@ -7,6 +7,7 @@ import com.muyu.center.common.gateway.cache.GatewayNodeSetVinCache;
import com.muyu.center.common.gateway.cache.GatewayVehicleNode; import com.muyu.center.common.gateway.cache.GatewayVehicleNode;
import com.muyu.center.common.gateway.model.GatewayNodeInfo; import com.muyu.center.common.gateway.model.GatewayNodeInfo;
import com.muyu.center.common.redis.service.RedisService; import com.muyu.center.common.redis.service.RedisService;
import com.muyu.center.service.GatewayLoadService;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
@ -17,6 +18,7 @@ import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -37,14 +39,14 @@ public class Consumer {
@Autowired @Autowired
private GatewayNodeSetVinCache gatewayNodeSetVinCache; private GatewayLoadService gatewayLoadService;
@Autowired @Autowired
private GatewayNodeCache gatewayNodeCache; private GatewayNodeCache gatewayNodeCache;
@Autowired @Autowired
private GatewayVehicleNode gatewayVehicleNode; private GatewayNodeSetVinCache gatewayNodeSetVinCache;
@Autowired @Autowired
private AliYunEcsService aliYunEcsService; private AliYunEcsService aliYunEcsService;
@ -105,24 +107,21 @@ public class Consumer {
Integer i = (Integer) result.getData(); Integer i = (Integer) result.getData();
if (i>0){ if (i>0){
log.info("重新上线成功");
for (String string : allVin) { gatewayNodeCache.remove(gatewayNodeInfo.getPublicIdAddress());
gatewayNodeSetVinCache.remove(gatewayNodeInfo.getPublicIdAddress(),string);
gatewayVehicleNode.remove(string);
}
gatewayNodeCache.remove(gatewayNodeInfo.getNodeId());
log.info("删除实例的对象");
} }
aliYunEcsService.releaseECS(gatewayNodeInfo.getNodeId()); // 释放ECS实例 // 释放ECS实例
aliYunEcsService.releaseECS(gatewayNodeInfo.getNodeId());
log.info("释放实例"); log.info("释放实例");
Thread.sleep(5000);
//刷新负载均衡
gatewayLoadService.refreshLoad();
redisService.setCacheObject("repeat:"+messageId,messageId); redisService.setCacheObject("idempotent:"+messageId,messageId);
channel.basicAck(deliveryTag,false); channel.basicAck(deliveryTag,false);
}else{ }else{
@ -146,10 +145,12 @@ public class Consumer {
if (deliveryTag==(old+2)){ if (deliveryTag==(old+2)){
// 重试三次后,仍然失败,退回消息 // 重试三次后,仍然失败,退回消息
log.info("已重试三次,实在消费不了"); log.info("已重试三次,实在消费不了");
channel.basicNack(deliveryTag,false,false); channel.basicNack(deliveryTag,false,false);
}else { }else {
// 尝试再次重试 // 尝试再次重试
log.info("继续重试"); log.info("继续重试");
channel.basicNack(deliveryTag,false,true); channel.basicNack(deliveryTag,false,true);
} }
} }

View File

@ -87,7 +87,6 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId); GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId);
return gatewayNodeInfo.getPublicIdAddress(); return gatewayNodeInfo.getPublicIdAddress();
} }
/** /**
@ -181,6 +180,9 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
//这里模拟(也可以在别的类里完成) ECS创建成功后服务器发送一条消息服务器正常启动mq可以正常使用存入redis //这里模拟(也可以在别的类里完成) ECS创建成功后服务器发送一条消息服务器正常启动mq可以正常使用存入redis
gatewayZSetNodeCache.put(ecsInstanceInfo.getPublicIpAddress(), 0); gatewayZSetNodeCache.put(ecsInstanceInfo.getPublicIpAddress(), 0);
this.refreshLoad();
log.info("实例id和公网ip存入redis"); log.info("实例id和公网ip存入redis");
} else if (loadRate.longValue()<15L) { } else if (loadRate.longValue()<15L) {
@ -220,11 +222,12 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
}else { }else {
log.info("负载过低但是节点数小于2不缩容"); log.info("负载过低但是节点数小于2不缩容");
} }
} }
} }
/** /**
* ECS * ECS
*/ */

View File

@ -1,5 +1,6 @@
package com.muyu.center.service.impl; package com.muyu.center.service.impl;
import com.muyu.center.common.domain.WorkGatewayNode;
import com.muyu.center.common.gateway.cache.GatewayVehicleNode; import com.muyu.center.common.gateway.cache.GatewayVehicleNode;
import com.muyu.center.service.GatewayVehicleService; import com.muyu.center.service.GatewayVehicleService;
import com.muyu.center.common.domain.Result; import com.muyu.center.common.domain.Result;
@ -8,9 +9,14 @@ import com.muyu.center.common.gateway.cache.GatewayZSetNodeCache;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/** /**
* @ProjectName: load_center * @ProjectName: load_center
* @PackageName: com.muyu.loadCenter.service.impl * @PackageName: com.muyu.loadCenter.service.impl
@ -29,6 +35,9 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService {
private final GatewayNodeSetVinCache gatewayNodeSetVinCache; private final GatewayNodeSetVinCache gatewayNodeSetVinCache;
private final GatewayZSetNodeCache gatewayZSetNodeCache;
private final GatewayVehicleNode gatewayVehicleNode; private final GatewayVehicleNode gatewayVehicleNode;
@Autowired @Autowired
@ -52,9 +61,39 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService {
gatewayNodeSetVinCache.put(data.getData().toString(),vin); gatewayNodeSetVinCache.put(data.getData().toString(),vin);
//// Map<Object, Double> map = gatewayZSetNodeCache.get2(data.getData().toString());
// Set<String> zSet = gatewayZSetNodeCache.redisService.getCacheZSet("gateway:zSet:count" + data.getData());
// Map<Object, Double> map = gatewayZSetNodeCache.redisService.getCacheZSetScore("gateway:zSet:count" + data.getData());
// Map<Object, Double> map = new HashMap<>();
// ZSetOperations<String, Object> zSetOperations =gatewayZSetNodeCache.redisService. redisTemplate.opsForZSet();
// // 构建一个 Map 用于存储成员和分数的对应关系
// zSetOperations.rangeWithScores("gateway:zSet:count" + data.getData(), 0, -1).forEach(tuple -> {
// map.put(tuple.getValue(), tuple.getScore());
// });
//
// System.out.println("------------------------------------------");
// Double aDouble = map.get(data.getData());
// System.out.println(map);
// System.out.println(aDouble);
// System.out.println("------------------------------------------");
//
//
// gatewayZSetNodeCache.put(data.getData().toString(),1);
return Result.success(data.getData().toString()); return Result.success(data.getData().toString());
} }
/**
* 线
* @param vin;
*/
@Override @Override
public void loadDownLine(String vin) { public void loadDownLine(String vin) {

View File

@ -42,7 +42,7 @@ public class LoadTest {
// System.out.println("------------=========----------"+set); // System.out.println("------------=========----------"+set);
redisService.setCacheZSet("gateway:zSet:count","39.103.130.89",0); redisService.setCacheZSet("gateway:zSet:count","39.103.130.89",0);
redisService.setCacheZSet("gateway:zSet:count","121.89.221.195",0); redisService.setCacheZSet("gateway:zSet:count","121.89.221.195",0);
redisService.setCacheZSet("gateway:zSet:count","39.101.192.67",0); redisService.setCacheZSet("gateway:zSet:count","39.99.236.232",0);
// GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo(); // GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo();
// gatewayNodeInfo.setPublicIdAddress("39.103.130.89"); // gatewayNodeInfo.setPublicIdAddress("39.103.130.89");