diff --git a/.gitignore b/.gitignore index 2976bd7..c7c93b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ target/ +.idea !.mvn/wrapper/maven-wrapper.jar !**/src/main/**/target/ !**/src/test/**/target/ diff --git a/src/main/java/com/yao/common/aliy/AliYunEcsService.java b/src/main/java/com/yao/common/aliy/AliYunEcsService.java index 867aee6..c9927d2 100644 --- a/src/main/java/com/yao/common/aliy/AliYunEcsService.java +++ b/src/main/java/com/yao/common/aliy/AliYunEcsService.java @@ -8,7 +8,7 @@ import com.aliyun.teautil.models.RuntimeOptions; import com.yao.common.aliy.model.EcsSelectModel; import com.yao.common.config.AlyConfigProperties; import com.yao.common.domain.aliy.InstanceInfo; -import com.yao.gateWay.cache.GatewayAliYunCache; +import com.yao.gateway.cache.GatewayNodeSetVinCache; import lombok.extern.log4j.Log4j2; import org.springframework.stereotype.Component; @@ -27,11 +27,11 @@ public class AliYunEcsService { private final AlyConfigProperties alyConfigProperties; private final Client client; - private final GatewayAliYunCache gatewayAliYunCache; - public AliYunEcsService(AlyConfigProperties alyConfigProperties, Client client, GatewayAliYunCache gatewayAliYunCache) { + private final GatewayNodeSetVinCache gatewayNodeSetVinCache; + public AliYunEcsService(AlyConfigProperties alyConfigProperties, Client client, GatewayNodeSetVinCache gatewayNodeSetVinCache) { this.alyConfigProperties = alyConfigProperties; this.client = client; - this.gatewayAliYunCache = gatewayAliYunCache; + this.gatewayNodeSetVinCache = gatewayNodeSetVinCache; } //todo----------------------------------------------------以下是查询代码-------------------------------------------- @@ -122,7 +122,7 @@ public class AliYunEcsService { String publicIP = item.getPublicIpAddress().substring(1, item.getPublicIpAddress().length() - 1); item.setPublicIpAddress(publicIP); //存入数据 - gatewayAliYunCache.newInstance(item); + gatewayNodeSetVinCache.newInstance(item); log.info("公网IP:" + item.getPublicIpAddress()); } ); diff --git a/src/main/java/com/yao/common/mqtt/MqttConnectService.java b/src/main/java/com/yao/common/mqtt/MqttConnectService.java index 09e7049..8e1a1ca 100644 --- a/src/main/java/com/yao/common/mqtt/MqttConnectService.java +++ b/src/main/java/com/yao/common/mqtt/MqttConnectService.java @@ -2,12 +2,11 @@ package com.yao.common.mqtt; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; -import com.yao.common.redis.service.RedisService; +import com.yao.gateway.cache.GatewayVehicleLineNodeCache; import lombok.extern.log4j.Log4j2; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; @@ -21,11 +20,11 @@ import java.io.IOException; @Log4j2 public class MqttConnectService { - /** - * 调用redis分装好的方法 - */ - @Autowired - private RedisService redisService; + private GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache; + + public MqttConnectService(GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache) { + this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache; + } //todo-----------------------连接mqtt方法------------------- @@ -37,7 +36,7 @@ public class MqttConnectService { String URL = "http://" + ip + ":8080/public/cluster"; OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder().url(URL).get().addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)").addHeader("Accesstoken", "").build(); - redisService.setCacheSet("ECS", ip); + gatewayVehicleLineNodeCache.address(ip); Response response = null; try { response = client.newCall(request).execute(); diff --git a/src/main/java/com/yao/common/redis/service/RedisService.java b/src/main/java/com/yao/common/redis/service/RedisService.java index 69a5a1f..d7f6646 100644 --- a/src/main/java/com/yao/common/redis/service/RedisService.java +++ b/src/main/java/com/yao/common/redis/service/RedisService.java @@ -151,7 +151,6 @@ public class RedisService { } /** * 获得缓存的list对象 - * * @param key 缓存的键值 * @return 缓存键值对应的数据 */ diff --git a/src/main/java/com/yao/gateWay/cache/GatewayNodeSetVinCache.java b/src/main/java/com/yao/gateWay/cache/GatewayNodeSetVinCache.java deleted file mode 100644 index ea9df4f..0000000 --- a/src/main/java/com/yao/gateWay/cache/GatewayNodeSetVinCache.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.yao.gateWay.cache; - -import org.springframework.stereotype.Component; - -/** - * @Author: LiJiaYao - * @Date: 2024/4/18 - * @Description: 网关节点存储vin详情 - */ -@Component -public class GatewayNodeSetVinCache { - -} diff --git a/src/main/java/com/yao/gateWay/cache/GatewayVehicleLineNodeCache.java b/src/main/java/com/yao/gateWay/cache/GatewayVehicleLineNodeCache.java deleted file mode 100644 index 48a07b0..0000000 --- a/src/main/java/com/yao/gateWay/cache/GatewayVehicleLineNodeCache.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.yao.gateWay.cache; - -/** - * @Author: LiJiaYao - * @Date: 2024/4/18 - * @Description: 网关连接车俩 - */ -public class GatewayVehicleLineNodeCache { -} diff --git a/src/main/java/com/yao/gateWay/cache/GateWayNodeInfo.java b/src/main/java/com/yao/gateway/cache/GateWayNodeInfo.java similarity index 93% rename from src/main/java/com/yao/gateWay/cache/GateWayNodeInfo.java rename to src/main/java/com/yao/gateway/cache/GateWayNodeInfo.java index 5f83138..6f696cc 100644 --- a/src/main/java/com/yao/gateWay/cache/GateWayNodeInfo.java +++ b/src/main/java/com/yao/gateway/cache/GateWayNodeInfo.java @@ -1,4 +1,4 @@ -package com.yao.gateWay.cache; +package com.yao.gateway.cache; import lombok.AllArgsConstructor; import lombok.Builder; diff --git a/src/main/java/com/yao/gateway/cache/GatewayArithmeticCache.java b/src/main/java/com/yao/gateway/cache/GatewayArithmeticCache.java new file mode 100644 index 0000000..2ee6bfb --- /dev/null +++ b/src/main/java/com/yao/gateway/cache/GatewayArithmeticCache.java @@ -0,0 +1,58 @@ +package com.yao.gateway.cache; + +import com.yao.gateway.cache.abs.GatewayNodeAbstract; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @Author: LiJiaYao + * @Date: 2024/4/18 + * @Description: 网关算法缓存 + */ +@Component +@Log4j2 +public class GatewayArithmeticCache extends GatewayNodeAbstract { + + private static final String loadNode ="work:node:gateway"; + private static final String cursor ="cursor"; + /** + * 计数器 + */ + public void count(){ + redisService.setCacheObject(cursor, 0); + } + + /** + * 自增 + * @return 把这个值自增 + */ + public Long increment(){ + return redisService.increment(cursor, 1L); + } + /** + * 删除loadNode的值 + */ + public void remove(){ + redisService.deleteObject(loadNode); + } + + /** + * 存储数据 + * @param loadNodeList 节点数据 + */ + public void loadNode(List loadNodeList){ + redisService.setCacheList(loadNode, loadNodeList); + } + + /** + * 获取list信息 + * @param count 传入的数据 + * @return nodeId + */ + public String cacheList(Long count){ + return redisService.getCacheList(loadNode, count % 100); + } +} + diff --git a/src/main/java/com/yao/gateway/cache/GatewayBusinessCache.java b/src/main/java/com/yao/gateway/cache/GatewayBusinessCache.java new file mode 100644 index 0000000..8631ec6 --- /dev/null +++ b/src/main/java/com/yao/gateway/cache/GatewayBusinessCache.java @@ -0,0 +1,20 @@ +package com.yao.gateway.cache; + +import com.yao.gateway.cache.abs.GatewayNodeAbstract; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +/** + * @Author: LiJiaYao + * @Date: 2024/4/18 + * @Description: 网关业务数据存储 + */ +@Component +@Log4j2 +public class GatewayBusinessCache extends GatewayNodeAbstract { + + + + + +} diff --git a/src/main/java/com/yao/gateWay/cache/GatewayLoadNodeCache.java b/src/main/java/com/yao/gateway/cache/GatewayLoadNodeCache.java similarity index 92% rename from src/main/java/com/yao/gateWay/cache/GatewayLoadNodeCache.java rename to src/main/java/com/yao/gateway/cache/GatewayLoadNodeCache.java index 3f733ac..752dec6 100644 --- a/src/main/java/com/yao/gateWay/cache/GatewayLoadNodeCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayLoadNodeCache.java @@ -1,6 +1,6 @@ -package com.yao.gateWay.cache; +package com.yao.gateway.cache; -import com.yao.gateWay.cache.abs.GatewayNodeAbstract; +import com.yao.gateway.cache.abs.GatewayNodeAbstract; import org.springframework.stereotype.Component; import java.util.List; diff --git a/src/main/java/com/yao/gateWay/cache/GatewayLoadSeriesCache.java b/src/main/java/com/yao/gateway/cache/GatewayLoadSeriesCache.java similarity index 91% rename from src/main/java/com/yao/gateWay/cache/GatewayLoadSeriesCache.java rename to src/main/java/com/yao/gateway/cache/GatewayLoadSeriesCache.java index 0b4fc95..cbe9a59 100644 --- a/src/main/java/com/yao/gateWay/cache/GatewayLoadSeriesCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayLoadSeriesCache.java @@ -1,6 +1,6 @@ -package com.yao.gateWay.cache; +package com.yao.gateway.cache; -import com.yao.gateWay.cache.abs.GatewayNodeAbstract; +import com.yao.gateway.cache.abs.GatewayNodeAbstract; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; diff --git a/src/main/java/com/yao/gateWay/cache/GatewayNodeCache.java b/src/main/java/com/yao/gateway/cache/GatewayNodeCache.java similarity index 92% rename from src/main/java/com/yao/gateWay/cache/GatewayNodeCache.java rename to src/main/java/com/yao/gateway/cache/GatewayNodeCache.java index 4b6bf5f..f69b14b 100644 --- a/src/main/java/com/yao/gateWay/cache/GatewayNodeCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayNodeCache.java @@ -1,6 +1,6 @@ -package com.yao.gateWay.cache; +package com.yao.gateway.cache; -import com.yao.gateWay.cache.abs.GatewayNodeAbstract; +import com.yao.gateway.cache.abs.GatewayNodeAbstract; import org.springframework.stereotype.Component; /** diff --git a/src/main/java/com/yao/gateWay/cache/GatewayNodeScoreCache.java b/src/main/java/com/yao/gateway/cache/GatewayNodeScoreCache.java similarity index 70% rename from src/main/java/com/yao/gateWay/cache/GatewayNodeScoreCache.java rename to src/main/java/com/yao/gateway/cache/GatewayNodeScoreCache.java index 35b8170..62d80b4 100644 --- a/src/main/java/com/yao/gateWay/cache/GatewayNodeScoreCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayNodeScoreCache.java @@ -1,6 +1,6 @@ -package com.yao.gateWay.cache; +package com.yao.gateway.cache; -import com.yao.gateWay.cache.abs.GatewayNodeAbstract; +import com.yao.gateway.cache.abs.GatewayNodeAbstract; import org.springframework.stereotype.Component; /** @@ -12,8 +12,4 @@ import org.springframework.stereotype.Component; public class GatewayNodeScoreCache extends GatewayNodeAbstract { - - - - } diff --git a/src/main/java/com/yao/gateWay/cache/GatewayAliYunCache.java b/src/main/java/com/yao/gateway/cache/GatewayNodeSetVinCache.java similarity index 57% rename from src/main/java/com/yao/gateWay/cache/GatewayAliYunCache.java rename to src/main/java/com/yao/gateway/cache/GatewayNodeSetVinCache.java index fec06b2..853e099 100644 --- a/src/main/java/com/yao/gateWay/cache/GatewayAliYunCache.java +++ b/src/main/java/com/yao/gateway/cache/GatewayNodeSetVinCache.java @@ -1,20 +1,19 @@ -package com.yao.gateWay.cache; +package com.yao.gateway.cache; +import com.aliyun.teautil.Common; import com.yao.common.domain.aliy.InstanceInfo; -import com.yao.gateWay.cache.abs.GatewayNodeAbstract; +import com.yao.gateway.cache.abs.GatewayNodeAbstract; import org.springframework.stereotype.Component; -import java.util.List; import java.util.Set; /** * @Author: LiJiaYao * @Date: 2024/4/18 - * @Description: 阿里云缓存 + * @Description: 网关节点存储vin详情 */ @Component -public class GatewayAliYunCache extends GatewayNodeAbstract { - +public class GatewayNodeSetVinCache extends GatewayNodeAbstract { /** * 阿里云键 */ @@ -37,4 +36,12 @@ public class GatewayAliYunCache extends GatewayNodeAbstract { return redisService.getCacheSet(realKey); } + /** + * 删除这个key其中的一个数据 + * @param realKey new:real:column键 + * @param info 对应的值 + */ + public void remote(String realKey,InstanceInfo info){ + redisService.deleteCacheMapValue(realKey, Common.toJSONString(info)); + } } diff --git a/src/main/java/com/yao/gateway/cache/GatewayVehicleLineNodeCache.java b/src/main/java/com/yao/gateway/cache/GatewayVehicleLineNodeCache.java new file mode 100644 index 0000000..e375845 --- /dev/null +++ b/src/main/java/com/yao/gateway/cache/GatewayVehicleLineNodeCache.java @@ -0,0 +1,44 @@ +package com.yao.gateway.cache; + +import com.yao.gateway.cache.abs.GatewayNodeAbstract; +import org.springframework.stereotype.Component; + +import java.util.Set; + +/** + * @Author: LiJiaYao + * @Date: 2024/4/18 + * @Description: 网关连接车俩 + */ +@Component +public class GatewayVehicleLineNodeCache extends GatewayNodeAbstract { + + private final static String reconnectCar = "reconnectCar"; + + /** + * 把数据存入数据 + * @param ip ip地址 + */ + public void save(String ip){ + redisService.setCacheSet(reconnectCar,ip); + } + + /** + * 存储ip地址数据 + * @param ip ip地址 + */ + public void address(String ip) { + redisService.setCacheSet("ECS", ip); + } + + /** + * 获取存储ecs的ip + */ + public Set getAddress() { + return redisService.getCacheSet("ECS"); + } + + + + +} diff --git a/src/main/java/com/yao/gateWay/cache/abs/GatewayNodeAbstract.java b/src/main/java/com/yao/gateway/cache/abs/GatewayNodeAbstract.java similarity index 86% rename from src/main/java/com/yao/gateWay/cache/abs/GatewayNodeAbstract.java rename to src/main/java/com/yao/gateway/cache/abs/GatewayNodeAbstract.java index e7ac836..23d6e48 100644 --- a/src/main/java/com/yao/gateWay/cache/abs/GatewayNodeAbstract.java +++ b/src/main/java/com/yao/gateway/cache/abs/GatewayNodeAbstract.java @@ -1,4 +1,4 @@ -package com.yao.gateWay.cache.abs; +package com.yao.gateway.cache.abs; import com.yao.common.redis.service.RedisService; @@ -14,8 +14,4 @@ public abstract class GatewayNodeAbstract { */ public RedisService redisService; - - - - } diff --git a/src/main/java/com/yao/server/controller/LoadController.java b/src/main/java/com/yao/server/controller/LoadController.java index 84e7f99..28ecc2f 100644 --- a/src/main/java/com/yao/server/controller/LoadController.java +++ b/src/main/java/com/yao/server/controller/LoadController.java @@ -18,7 +18,6 @@ public class LoadController { private LoadService loadService; @PostMapping("/load") public Result loadInfo(){ - String load = loadService.load(); - return Result.success(load); + return Result.success(loadService.load()); } } diff --git a/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java b/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java index c222d10..237b122 100644 --- a/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java +++ b/src/main/java/com/yao/server/service/impl/GatewayLoadServiceImpl.java @@ -1,9 +1,9 @@ package com.yao.server.service.impl; -import com.yao.gateWay.cache.GateWayNodeInfo; -import com.yao.gateWay.cache.GatewayLoadNodeCache; -import com.yao.gateWay.cache.GatewayLoadSeriesCache; -import com.yao.gateWay.cache.GatewayNodeCache; +import com.yao.gateway.cache.GateWayNodeInfo; +import com.yao.gateway.cache.GatewayLoadNodeCache; +import com.yao.gateway.cache.GatewayLoadSeriesCache; +import com.yao.gateway.cache.GatewayNodeCache; import com.yao.server.service.GatewayLoadService; import lombok.AllArgsConstructor; import org.springframework.stereotype.Service; diff --git a/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java b/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java index af39abb..ee15504 100644 --- a/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java +++ b/src/main/java/com/yao/server/service/impl/LoadServiceImpl.java @@ -3,10 +3,10 @@ package com.yao.server.service.impl; import com.yao.common.config.Constants; import com.yao.common.domain.WorkGateWayNode; import com.yao.common.mqtt.MqttConnectService; -import com.yao.common.redis.service.RedisService; +import com.yao.gateway.cache.GatewayArithmeticCache; +import com.yao.gateway.cache.GatewayVehicleLineNodeCache; import com.yao.server.service.LoadService; import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; @@ -21,20 +21,22 @@ import java.util.concurrent.CountDownLatch; @Log4j2 @Service public class LoadServiceImpl implements LoadService { - - @Autowired - private RedisService redisService; - + //网关算法缓存 + private final GatewayArithmeticCache gatewayArithmeticCache; + // 网关连接车俩 + private final GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache; private MqttConnectService mqttConnectService; - public LoadServiceImpl(MqttConnectService mqttConnectService) { + public LoadServiceImpl(GatewayArithmeticCache gatewayArithmeticCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache, MqttConnectService mqttConnectService) { + this.gatewayArithmeticCache = gatewayArithmeticCache; + this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache; this.mqttConnectService = mqttConnectService; } @Override public String load() { //初始化序列 - redisService.setCacheObject("cursor", 0); + gatewayArithmeticCache.count(); ArrayList nodeIdList = carWorkGatewayNode(); //100 List loadNodeList = new ArrayList<>(); @@ -68,13 +70,13 @@ public class LoadServiceImpl implements LoadService { } } } - redisService.deleteObject("work:node:gateway"); - redisService.setCacheList("work:node:gateway", loadNodeList); + gatewayArithmeticCache.remove(); + gatewayArithmeticCache.loadNode(loadNodeList); CountDownLatch countDownLatch = new CountDownLatch(300); new Thread(() -> { for (int i = 0; i < Constants.SUM; i++) { - long cursor = redisService.increment("cursor", 1L); - String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); + long cursor = gatewayArithmeticCache.increment(); + String cacheList = gatewayArithmeticCache.cacheList(cursor); log.info(cursor + "---------" + cacheList); SitNode.sti(cacheList); countDownLatch.countDown(); @@ -83,8 +85,8 @@ public class LoadServiceImpl implements LoadService { new Thread(() -> { for (int i = 0; i < Constants.SUM; i++) { - long cursor = redisService.increment("cursor", 1L); - String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); + long cursor = gatewayArithmeticCache.increment(); + String cacheList = gatewayArithmeticCache.cacheList(cursor); log.info(cursor + "---------" + cacheList); SitNode.sti(cacheList); countDownLatch.countDown(); @@ -93,8 +95,8 @@ public class LoadServiceImpl implements LoadService { new Thread(() -> { for (int i = 0; i < Constants.SUM; i++) { - Long cursor = redisService.increment("cursor", 1L); - String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); + long cursor = gatewayArithmeticCache.increment(); + String cacheList = gatewayArithmeticCache.cacheList(cursor); log.info(cursor + "---------" + cacheList); SitNode.sti(cacheList); countDownLatch.countDown(); @@ -123,7 +125,7 @@ public class LoadServiceImpl implements LoadService { } public ArrayList carWorkGatewayNode() { - Set ip = redisService.getCacheSet("ECS"); + Set ip = gatewayVehicleLineNodeCache.getAddress(); ArrayList list = new ArrayList<>(); for (String s : ip) { Integer connectSize = mqttConnectService.connectMqtt(s); @@ -170,6 +172,4 @@ class SitNode { return map; } - - } diff --git a/src/main/java/com/yao/server/test/Test.java b/src/main/java/com/yao/server/test/Test.java index a0b5132..895b349 100644 --- a/src/main/java/com/yao/server/test/Test.java +++ b/src/main/java/com/yao/server/test/Test.java @@ -1,11 +1,9 @@ package com.yao.server.test; -import com.aliyun.ecs20140526.Client; import com.aliyun.ecs20140526.models.DescribeAvailableResourceRequest; import com.aliyun.ecs20140526.models.DescribeAvailableResourceResponse; import com.aliyun.tea.TeaException; import com.aliyun.tea.TeaModel; -import com.aliyun.teaopenapi.models.Config; /** * @Author: LiJiaYao diff --git a/src/main/java/com/yao/server/timer/Timer.java b/src/main/java/com/yao/server/timer/Timer.java index 8a526fb..ee013fb 100644 --- a/src/main/java/com/yao/server/timer/Timer.java +++ b/src/main/java/com/yao/server/timer/Timer.java @@ -1,20 +1,16 @@ package com.yao.server.timer; -import com.aliyun.teautil.Common; import com.yao.common.aliy.AliYunEcsService; -import com.yao.common.aliy.model.EcsSelectModel; import com.yao.common.domain.aliy.InstanceInfo; -import com.yao.common.domain.aliy.InstanceRequest; import com.yao.common.mqtt.MqttConnectService; -import com.yao.common.redis.service.RedisService; +import com.yao.gateway.cache.GatewayNodeSetVinCache; +import com.yao.gateway.cache.GatewayVehicleLineNodeCache; import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -27,96 +23,60 @@ import java.util.Set; @EnableAsync //开启异步支持 @Component public class Timer { - - @Autowired - private RedisService redisService; + private static final String realKey = "new:real:column"; private AliYunEcsService aliYunEcsService; private MqttConnectService mqttConnectService; + // 阿里云缓存 + private GatewayNodeSetVinCache gatewayNodeSetVinCache; + private GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache; - public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService) { + public Timer(AliYunEcsService aliYunEcsService, MqttConnectService mqttConnectService, GatewayNodeSetVinCache gatewayAliYunCache, GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache) { this.aliYunEcsService = aliYunEcsService; this.mqttConnectService = mqttConnectService; + this.gatewayNodeSetVinCache = gatewayAliYunCache; + this.gatewayVehicleLineNodeCache = gatewayVehicleLineNodeCache; } @Async @Scheduled(cron = "0/20 * * * * ?") public void timer() { - long startTime = System.currentTimeMillis(); // 记录开始时间 - long endTime = startTime + 10 * 60 * 1000; // 设置结束时间为10分钟后 -// redisService.deleteObject("new:real:column"); - Set instance = redisService.getCacheSet("new:real:column"); -// List myFirstEcsInstance = null; -// try { -// myFirstEcsInstance = aliYunEcsService.selectList(ecsSelectModelName("MyFirstEcsInstance")); -// for (InstanceInfo instanceInfo : myFirstEcsInstance) { -// String publicIpAddress = instanceInfo.getPublicIpAddress(); -// String instanceId = instanceInfo.getInstanceId(); -// InstanceRequest instanceRequest = new InstanceRequest(publicIpAddress, instanceId); -// redisService.setCacheSet("new:real:column", instanceRequest); -// } - if (instance.isEmpty()) { -// if (myFirstEcsInstance.isEmpty()) { + // 记录开始时间 + long startTime = System.currentTimeMillis(); + // 设置结束时间为10分钟后 + long endTime = startTime + 10 * 60 * 1000; + Set instance = gatewayNodeSetVinCache.get(realKey); + if (instance.isEmpty()) { + try { + aliYunEcsService.startCreate(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + for (InstanceInfo s : instance) { + Integer connectSize = mqttConnectService.connectMqtt(s.getPublicIpAddress()); + List instanceId = null; + String deleteInstanceId = null; + if (connectSize >= 79) { + //执行节点扩容 + //返回实例的ID + if (!instanceId.isEmpty()) { try { - aliYunEcsService.startCreate(); + instanceId = aliYunEcsService.startCreate(); + log.info("扩容成功!"); + log.info("扩容的节点id为:" + instanceId); } catch (Exception e) { throw new RuntimeException(e); } -// } - } - for (InstanceInfo s : instance) { - Integer connectSize = mqttConnectService.connectMqtt(s.getPublicIpAddress()); - List instanceId = null; - String deleteInstanceId = null; - if (connectSize >= 80) { - //执行节点扩容 - //返回实例的ID - if (!instanceId.isEmpty()) { - try { - instanceId = aliYunEcsService.startCreate(); - log.info("扩容成功!"); - log.info("扩容的节点id为:" + instanceId); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - if (connectSize <= 20 && System.currentTimeMillis() < endTime) { -// if (!deleteInstanceId.isEmpty()) { - aliYunEcsService.delete(s.getInstanceId()); - //删除实列以后再去把redis的值删除 再去通知重新上线 - redisService.deleteCacheMapValue("new:real:column", Common.toJSONString(s)); - redisService.setCacheSet("reconnectCar",s.getPublicIpAddress()); - log.info("缩容成功!"); - log.info("锁容的节点id为:" + deleteInstanceId); } } -// } catch (Exception e) { -// throw new RuntimeException(e); + if (connectSize <= 20 && System.currentTimeMillis() < endTime) { + aliYunEcsService.delete(s.getInstanceId()); + //删除实列以后再去把redis的值删除 再去通知重新上线 + gatewayNodeSetVinCache.remote(realKey, s); + gatewayVehicleLineNodeCache.save(s.getPublicIpAddress()); + log.info("缩容成功!"); + log.info("锁容的节点id为:" + deleteInstanceId); + } } - } - - /** - * 查出来数据以后把值返回给要查的数据 - * - * @param instance - * @return - */ -// public EcsSelectModel ecsSelectModel(Set instance) { -// List instanceIdList = new ArrayList<>(); -// for (InstanceInfo req : instance) { -// instanceIdList.add(req.getInstanceId()); -// } -// EcsSelectModel ecsSelectModel = new EcsSelectModel(); -// ecsSelectModel.setInstanceIdList(instanceIdList); -// return ecsSelectModel; -// } -// public EcsSelectModel ecsSelectModelName(String instanceName) { -// List instanceIdList = new ArrayList<>(); -// instanceIdList.add(instanceName); -// EcsSelectModel ecsSelectModel = new EcsSelectModel(); -// ecsSelectModel.setInstanceIdList(instanceIdList); -// return ecsSelectModel; -// } - -//} +} diff --git a/src/test/java/com/yao/LoadService1.java b/src/test/java/com/yao/LoadService1.java index a36a45e..de44da5 100644 --- a/src/test/java/com/yao/LoadService1.java +++ b/src/test/java/com/yao/LoadService1.java @@ -20,13 +20,10 @@ import java.util.concurrent.CountDownLatch; @SpringBootTest(classes = APISpringBootApplication.class) public class LoadService1 { - @Autowired - private RedisService redisService; - @Test public void load() { //初始化序列 - redisService.setCacheObject("cursor",0); +// redisService.setCacheObject("cursor",0); List nodeIdList = new ArrayList() { { @@ -77,33 +74,33 @@ public class LoadService1 { // for (int i = 0; i < 100; i++) { // loadNodeList.add(nodeIdList.get(i % nodeIdList.size())); // } - redisService.deleteObject("work:node:gateway"); - redisService.setCacheList("work:node:gateway",loadNodeList); +// redisService.deleteObject("work:node:gateway"); +// redisService.setCacheList("work:node:gateway",loadNodeList); CountDownLatch countDownLatch = new CountDownLatch(300); new Thread(()->{ for (int i = 0; i < 100; i++) { - Long cursor = redisService.increment("cursor", 1L); - String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); - log.info(cursor+"---------"+cacheList); - SitNode.sti(cacheList); +// Long cursor = redisService.increment("cursor", 1L); +// String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); +// log.info(cursor+"---------"+cacheList); +// SitNode.sti(cacheList); countDownLatch.countDown(); } }).start(); new Thread(()->{ for (int i = 0; i < 100; i++) { - Long cursor = redisService.increment("cursor", 1L); - String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); - log.info(cursor+"---------"+cacheList); - SitNode.sti(cacheList); +// Long cursor = redisService.increment("cursor", 1L); +// String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); +// log.info(cursor+"---------"+cacheList); +// SitNode.sti(cacheList); countDownLatch.countDown(); } }).start(); new Thread(()->{ for (int i = 0; i < 100; i++) { - Long cursor = redisService.increment("cursor", 1L); - String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); - log.info(cursor+"---------"+cacheList); - SitNode.sti(cacheList); +// Long cursor = redisService.increment("cursor", 1L); +// String cacheList = redisService.getCacheList("work:node:gateway", cursor % 100); +// log.info(cursor+"---------"+cacheList); +// SitNode.sti(cacheList); countDownLatch.countDown(); } }).start();