From 85c99c55bd0f9277ee6787a0471681c8c70d4139 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=A4=A7=E4=B8=BE?= <13970129+huandgaju@user.noreply.gitee.com> Date: Sat, 20 Apr 2024 10:36:53 +0800 Subject: [PATCH] 4.20 --- .../gateway/cache/GatewayNodeCache.java | 2 +- .../gateway/cache/GatewayZSetNodeCache.java | 4 ++ .../center/controller/GatewayController.java | 4 -- .../java/com/muyu/center/mq/Consumer.java | 27 ++++++------- .../service/impl/GatewayLoadServiceImpl.java | 9 +++-- .../impl/GatewayVehicleServiceImpl.java | 39 +++++++++++++++++++ src/test/java/LoadTest.java | 2 +- 7 files changed, 65 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/muyu/center/common/gateway/cache/GatewayNodeCache.java b/src/main/java/com/muyu/center/common/gateway/cache/GatewayNodeCache.java index d62b6d4..727a95b 100644 --- a/src/main/java/com/muyu/center/common/gateway/cache/GatewayNodeCache.java +++ b/src/main/java/com/muyu/center/common/gateway/cache/GatewayNodeCache.java @@ -48,7 +48,7 @@ public class GatewayNodeCache extends GatewayCacheAbs { */ public void remove(String nodeId){ - redisService.deleteObject(encode(nodeId)); + redisService.deleteObject(encode(gatewayNodeCache)+nodeId); } diff --git a/src/main/java/com/muyu/center/common/gateway/cache/GatewayZSetNodeCache.java b/src/main/java/com/muyu/center/common/gateway/cache/GatewayZSetNodeCache.java index 04c450c..926c208 100644 --- a/src/main/java/com/muyu/center/common/gateway/cache/GatewayZSetNodeCache.java +++ b/src/main/java/com/muyu/center/common/gateway/cache/GatewayZSetNodeCache.java @@ -91,6 +91,10 @@ public class GatewayZSetNodeCache extends GatewayCacheAbs { } + public Map get2(String ip) { + return redisService.getCacheZSetScore(encode(gatewayZSetCount) + ip); + + } } diff --git a/src/main/java/com/muyu/center/controller/GatewayController.java b/src/main/java/com/muyu/center/controller/GatewayController.java index 1d52ab3..95a989d 100644 --- a/src/main/java/com/muyu/center/controller/GatewayController.java +++ b/src/main/java/com/muyu/center/controller/GatewayController.java @@ -47,10 +47,6 @@ public class GatewayController { gatewayLoadService.dynamic(); } - @Scheduled(cron = "0/30 * * * * ?") - public void refreshLoad() throws Exception { - gatewayLoadService.refreshLoad(); - } diff --git a/src/main/java/com/muyu/center/mq/Consumer.java b/src/main/java/com/muyu/center/mq/Consumer.java index e72abe8..483b03a 100644 --- a/src/main/java/com/muyu/center/mq/Consumer.java +++ b/src/main/java/com/muyu/center/mq/Consumer.java @@ -7,6 +7,7 @@ import com.muyu.center.common.gateway.cache.GatewayNodeSetVinCache; import com.muyu.center.common.gateway.cache.GatewayVehicleNode; import com.muyu.center.common.gateway.model.GatewayNodeInfo; import com.muyu.center.common.redis.service.RedisService; +import com.muyu.center.service.GatewayLoadService; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; @@ -17,6 +18,7 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Set; /** @@ -37,14 +39,14 @@ public class Consumer { @Autowired - private GatewayNodeSetVinCache gatewayNodeSetVinCache; + private GatewayLoadService gatewayLoadService; @Autowired private GatewayNodeCache gatewayNodeCache; @Autowired - private GatewayVehicleNode gatewayVehicleNode; + private GatewayNodeSetVinCache gatewayNodeSetVinCache; @Autowired private AliYunEcsService aliYunEcsService; @@ -105,24 +107,21 @@ public class Consumer { Integer i = (Integer) result.getData(); if (i>0){ - - for (String string : allVin) { - gatewayNodeSetVinCache.remove(gatewayNodeInfo.getPublicIdAddress(),string); - - gatewayVehicleNode.remove(string); - } - - gatewayNodeCache.remove(gatewayNodeInfo.getNodeId()); - log.info("删除实例的对象"); + log.info("重新上线成功"); + gatewayNodeCache.remove(gatewayNodeInfo.getPublicIdAddress()); } - aliYunEcsService.releaseECS(gatewayNodeInfo.getNodeId()); // 释放ECS实例 + // 释放ECS实例 + aliYunEcsService.releaseECS(gatewayNodeInfo.getNodeId()); log.info("释放实例"); + Thread.sleep(5000); + //刷新负载均衡 + gatewayLoadService.refreshLoad(); - redisService.setCacheObject("repeat:"+messageId,messageId); + redisService.setCacheObject("idempotent:"+messageId,messageId); channel.basicAck(deliveryTag,false); }else{ @@ -146,10 +145,12 @@ public class Consumer { if (deliveryTag==(old+2)){ // 重试三次后,仍然失败,退回消息 log.info("已重试三次,实在消费不了"); + channel.basicNack(deliveryTag,false,false); }else { // 尝试再次重试 log.info("继续重试"); + channel.basicNack(deliveryTag,false,true); } } diff --git a/src/main/java/com/muyu/center/service/impl/GatewayLoadServiceImpl.java b/src/main/java/com/muyu/center/service/impl/GatewayLoadServiceImpl.java index f05e3f9..19246ac 100644 --- a/src/main/java/com/muyu/center/service/impl/GatewayLoadServiceImpl.java +++ b/src/main/java/com/muyu/center/service/impl/GatewayLoadServiceImpl.java @@ -87,7 +87,6 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId); return gatewayNodeInfo.getPublicIdAddress(); - } /** @@ -181,6 +180,9 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { //这里模拟(也可以在别的类里完成) ECS创建成功后,服务器发送一条消息服务器正常启动,mq可以正常使用,存入redis gatewayZSetNodeCache.put(ecsInstanceInfo.getPublicIpAddress(), 0); + + this.refreshLoad(); + log.info("实例id和公网ip存入redis"); } else if (loadRate.longValue()<15L) { @@ -220,11 +222,12 @@ public class GatewayLoadServiceImpl implements GatewayLoadService { }else { log.info("负载过低,但是节点数小于2,不缩容"); } - - } } + + + /** * 查询ECS车辆连接数 */ diff --git a/src/main/java/com/muyu/center/service/impl/GatewayVehicleServiceImpl.java b/src/main/java/com/muyu/center/service/impl/GatewayVehicleServiceImpl.java index d573be2..b2cbabf 100644 --- a/src/main/java/com/muyu/center/service/impl/GatewayVehicleServiceImpl.java +++ b/src/main/java/com/muyu/center/service/impl/GatewayVehicleServiceImpl.java @@ -1,5 +1,6 @@ package com.muyu.center.service.impl; +import com.muyu.center.common.domain.WorkGatewayNode; import com.muyu.center.common.gateway.cache.GatewayVehicleNode; import com.muyu.center.service.GatewayVehicleService; import com.muyu.center.common.domain.Result; @@ -8,9 +9,14 @@ import com.muyu.center.common.gateway.cache.GatewayZSetNodeCache; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + /** * @ProjectName: load_center * @PackageName: com.muyu.loadCenter.service.impl @@ -29,6 +35,9 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService { private final GatewayNodeSetVinCache gatewayNodeSetVinCache; + private final GatewayZSetNodeCache gatewayZSetNodeCache; + + private final GatewayVehicleNode gatewayVehicleNode; @Autowired @@ -52,9 +61,39 @@ public class GatewayVehicleServiceImpl implements GatewayVehicleService { gatewayNodeSetVinCache.put(data.getData().toString(),vin); +//// Map map = gatewayZSetNodeCache.get2(data.getData().toString()); +// Set zSet = gatewayZSetNodeCache.redisService.getCacheZSet("gateway:zSet:count" + data.getData()); +// Map map = gatewayZSetNodeCache.redisService.getCacheZSetScore("gateway:zSet:count" + data.getData()); + + + +// Map map = new HashMap<>(); +// ZSetOperations zSetOperations =gatewayZSetNodeCache.redisService. redisTemplate.opsForZSet(); +// // 构建一个 Map 用于存储成员和分数的对应关系 +// zSetOperations.rangeWithScores("gateway:zSet:count" + data.getData(), 0, -1).forEach(tuple -> { +// map.put(tuple.getValue(), tuple.getScore()); +// }); +// +// System.out.println("------------------------------------------"); +// Double aDouble = map.get(data.getData()); +// System.out.println(map); +// System.out.println(aDouble); +// System.out.println("------------------------------------------"); +// +// +// gatewayZSetNodeCache.put(data.getData().toString(),1); + return Result.success(data.getData().toString()); } + + + + /** + * 车辆下线 + * @param vin; + + */ @Override public void loadDownLine(String vin) { diff --git a/src/test/java/LoadTest.java b/src/test/java/LoadTest.java index 978475c..d827518 100644 --- a/src/test/java/LoadTest.java +++ b/src/test/java/LoadTest.java @@ -42,7 +42,7 @@ public class LoadTest { // System.out.println("------------=========----------"+set); redisService.setCacheZSet("gateway:zSet:count","39.103.130.89",0); redisService.setCacheZSet("gateway:zSet:count","121.89.221.195",0); - redisService.setCacheZSet("gateway:zSet:count","39.101.192.67",0); + redisService.setCacheZSet("gateway:zSet:count","39.99.236.232",0); // GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo(); // gatewayNodeInfo.setPublicIdAddress("39.103.130.89");