增加数据类型结构及模型,及一些业务添加
master
JangCan 2024-04-19 18:46:33 +08:00
parent edaffed736
commit ea62395c0e
12 changed files with 498 additions and 18 deletions

View File

@ -1,9 +1,6 @@
package com.loadcenter.common.redis.service; package com.loadcenter.common.redis.service;
import org.springframework.data.redis.core.BoundSetOperations; import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -143,8 +140,8 @@ public class RedisService {
return count == null ? 0 : count; return count == null ? 0 : count;
} }
public <T> T getCacheListValue(final String key,long index){ public <T> T getCacheListValue(final String key, long index) {
return (T) redisTemplate.opsForList().index(key,index); return (T) redisTemplate.opsForList().index(key, index);
} }
/** /**
@ -158,7 +155,7 @@ public class RedisService {
} }
public <T> T getCacheList(final String key, Long index) { public <T> T getCacheList(final String key, Long index) {
return (T) redisTemplate.opsForList().index(key, index ); return (T) redisTemplate.opsForList().index(key, index);
} }
/** /**
@ -190,6 +187,11 @@ public class RedisService {
return setOperation; return setOperation;
} }
public <T> BoundZSetOperations<String ,T> setCacheZSet(final String key,final T setValue,double score){
BoundZSetOperations boundZSetOperations = redisTemplate.boundZSetOps(key);
boundZSetOperations.add(setValue,score);
return boundZSetOperations;
}
/* /*
* Zset * Zset
@ -205,6 +207,17 @@ public class RedisService {
} }
public <T> Map<Object, Double> getCacheZSetScore(final String key) {
ZSetOperations<String, Double> zSetOperations = redisTemplate.opsForZSet();
//构建一个Map 用于存储成员和分数的对应关系
Map<Object, Double> memberScores = new HashMap<>();
zSetOperations.rangeWithScores(key, 0, -1).forEach(tuple -> {
memberScores.put(tuple.getValue(), tuple.getScore());
});
return memberScores;
}
/** /**
* Set * Set
* *
@ -218,6 +231,11 @@ public class RedisService {
setOperations.remove(setValue); setOperations.remove(setValue);
} }
public <T> void deleteCacheSet(String key,T setValue){
BoundSetOperations<String,T> setOperation = redisTemplate.boundSetOps(key);
setOperation.remove(setValue);
}
/** /**
* set * set
* *
@ -229,6 +247,15 @@ public class RedisService {
} }
/**
* zset
* @param key
* @param value
*/
public void deleteCacheZset(final String key ,final String value){
redisTemplate.opsForZSet().remove(key,value);
}
/** /**
* Map * Map
* *
@ -308,6 +335,7 @@ public class RedisService {
/** /**
* *
*
* @param key * @param key
* @param number * @param number
* @return * @return
@ -318,6 +346,7 @@ public class RedisService {
/** /**
* *
*
* @param key * @param key
* @param number * @param number
* @return * @return

View File

@ -3,6 +3,7 @@ package com.loadcenter.controller;
import com.loadcenter.common.domain.resp.Result; import com.loadcenter.common.domain.resp.Result;
import com.loadcenter.service.GatewayLoadService; import com.loadcenter.service.GatewayLoadService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@ -20,10 +21,23 @@ public class GatewayController {
@Autowired @Autowired
private GatewayLoadService gatewayLoadService; private GatewayLoadService gatewayLoadService;
/**
* 线 id
* @return
*/
@GetMapping("/load/node") @GetMapping("/load/node")
public Result<String> loadNode(){ public Result<String> loadNode(){
return Result.success(gatewayLoadService.loadNode()); return Result.success(gatewayLoadService.loadNode());
} }
/**
* 30
*
*
*/
@Scheduled(cron = "0/5 * * * * ?")
public void scheduleECS(){
gatewayLoadService.scheduleECS();
}
} }

View File

@ -1,6 +1,4 @@
package com.loadcenter.gateway.cache; package com.loadcenter.gateway.cache;
import com.loadcenter.common.redis.service.RedisService;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs; import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;

View File

@ -0,0 +1,50 @@
package com.loadcenter.gateway.cache;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import com.loadcenter.gateway.model.WorkGatewayNode;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
/**
* @ClassName GatewayNodeScoreCache
* @Description
* @Author Can.J
* @Date 2024/4/19 14:47
*/
@Service
public class GatewayNodeScoreCache extends GatewayCacheAbs<String> {
private final static String gatewayNodeScoreCacheKey = "score";
@Override
public String getPre() {
return "gateway:node:";
}
public List<WorkGatewayNode> get() {
Set<ZSetOperations.TypedTuple<String>> range = redisService.redisTemplate.opsForZSet().rangeWithScores(encode(gatewayNodeScoreCacheKey), 0, -1);
return range.stream()
.map(zSet -> WorkGatewayNode.builder()
.nodeId(zSet.getValue()).source(zSet.getScore()).build())
.toList();
}
public Long getNodeNowNum() {
List<WorkGatewayNode> workGatewayNodes = get();
Long vehicleNowOnlineNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getSource).sum()));
return vehicleNowOnlineNum;
}
/**
* 线
* @return
*/
public Long getNodeMaxOnlineNum() {
List<WorkGatewayNode> workGatewayNodes = get();
return workGatewayNodes.size() * 80L;
}
}

View File

@ -0,0 +1,34 @@
package com.loadcenter.gateway.cache;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import org.springframework.stereotype.Component;
/**
* @ClassName GatewayNodeSetVinCache
* @Description VIN
* @Author Can.J
* @Date 2024/4/19 9:49
*/
@Component
public class GatewayNodeSetVinCache extends GatewayCacheAbs<String> {
private final static String gatewayNodeSetVinCache ="cars:";
@Override
public String getPre() {
return "gateway:node:";
}
/**
* vin
* @param node
* @param vin
*/
public void put(String node,String vin){
redisService.setCacheSet(encode(gatewayNodeSetVinCache)+node,vin);
}
public void remove(String nodeId,String vin){
redisService.deleteCacheSet(encode(gatewayNodeSetVinCache)+nodeId,vin);
}
}

View File

@ -0,0 +1,46 @@
package com.loadcenter.gateway.cache;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import org.springframework.stereotype.Component;
/**
* @ClassName GatewayVehicleNode
* @Description 线+VINid
* @Author Can.J
* @Date 2024/4/19 9:39
*/
@Component
public class GatewayVehicleNode extends GatewayCacheAbs<String> {
private final static String gatewayCarBusinessKey ="business";
@Override
public String getPre() {
return "gateway:car:";
}
/**
* vin gateway:car:business+vin, id
* @param vin
* @param nodeId
*/
public void put(String vin,String nodeId){
redisService.setCacheObject(encode(gatewayCarBusinessKey)+vin,nodeId);
}
/**
* vin gateway:car:business
* @param vin
*/
public void remove(String vin){
redisService.deleteObject(encode(gatewayCarBusinessKey+vin));
}
/**
*
* @param vin
* @return
*/
public String get(String vin){
return redisService.getCacheObject(encode(gatewayCarBusinessKey)+vin);
}
}

View File

@ -0,0 +1,49 @@
package com.loadcenter.gateway.cache;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @ClassName GatewayZSetNodeCache
* @Description
* @Author Can.J
* @Date 2024/4/19 9:00
*/
@Component
public class GatewayZSetNodeCache extends GatewayCacheAbs<String> {
private final static String gatewayZSetCount ="count";
@Override
public String getPre() {
return "gateway:zSet";
}
/**
* zset
* @return
*/
public Map<Object,Double> get(){
return redisService.getCacheZSetScore(encode(gatewayZSetCount));
}
/**
* 线
* @param nodeId
* @param onlineVehicle
*/
public void put(String nodeId,Integer onlineVehicle){
redisService.setCacheZSet(encode(gatewayZSetCount),nodeId,onlineVehicle);
}
/**
* zset
* @param nodeId
*/
public void remove(String nodeId){
redisService.deleteCacheZset(encode(gatewayZSetCount),nodeId);
}
}

View File

@ -0,0 +1,47 @@
package com.loadcenter.gateway.cache;
import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
import org.springframework.stereotype.Component;
/**
* @ClassName VehicleOnlineCache
* @Description
* @Author Can.J
* @Date 2024/4/18 22:02
*/
@Component
public class VehicleOnlineCache extends GatewayCacheAbs<String> {
private final static String vehicleOnlineKey ="vehicle_online";
@Override
public String getPre() {
return "vehicle:";
}
/**
* 线
* @param vin VIN
* @param nodeId id
*/
public void recordVehicleOnline(String vin,String nodeId){
redisService.setCacheObject(encode(vehicleOnlineKey+vin),nodeId);
}
/**
* 线id
* @param vin VIN
* @return id
*/
public String getOnlineGatewayNode(String vin){
return redisService.getCacheObject(encode(vehicleOnlineKey+vin));
}
/**
* 线
* @param vin VIN
*/
public void removeVehicleOnline(String vin){
redisService.deleteObject(encode(vehicleOnlineKey +vin));
}
}

View File

@ -0,0 +1,23 @@
package com.loadcenter.gateway.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class WorkGatewayNode {
/**
* id
*/
private String nodeId;
/**
*
*/
private Double source;
}

View File

@ -0,0 +1,22 @@
package com.loadcenter.gateway.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class WorkGatewayNodeSource {
/**
* id
*/
private String nodeId;
/**
*
*/
private int weight;
}

View File

@ -11,4 +11,9 @@ public interface GatewayLoadService {
* @return * @return
*/ */
String loadNode(); String loadNode();
void scheduleECS();
void refresh();
} }

View File

@ -1,22 +1,34 @@
package com.loadcenter.service.impl; package com.loadcenter.service.impl;
import com.loadcenter.gateway.cache.GatewayLoadNodeCache; import com.alibaba.fastjson2.JSONArray;
import com.loadcenter.gateway.cache.GatewayLoadSeriesCache; import com.alibaba.fastjson2.JSONObject;
import com.loadcenter.gateway.cache.GatewayNodeCache; import com.loadcenter.gateway.cache.*;
import com.loadcenter.gateway.model.GatewayNodeInfo; import com.loadcenter.gateway.model.GatewayNodeInfo;
import com.loadcenter.gateway.model.WorkGatewayNode;
import com.loadcenter.gateway.model.WorkGatewayNodeSource;
import com.loadcenter.service.GatewayLoadService; import com.loadcenter.service.GatewayLoadService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired; import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** /**
* @ClassName GatewayLoadServiceImpl * @ClassName GatewayLoadServiceImpl
* @Description * @Description
* @Author Can.J * @Author Can.J
* @Date 2024/4/18 16:10 * @Date 2024/4/18 16:10
*/ */
@Service @Service
@AllArgsConstructor @AllArgsConstructor
@Slf4j
public class GatewayLoadServiceImpl implements GatewayLoadService { public class GatewayLoadServiceImpl implements GatewayLoadService {
private final Long nodeLength = 100L; private final Long nodeLength = 100L;
/** /**
@ -35,7 +47,16 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
private final GatewayNodeCache gatewayNodeCache; private final GatewayNodeCache gatewayNodeCache;
/** /**
* *
*/
private final GatewayZSetNodeCache gatewayZSetNodeCache;
private final GatewayNodeScoreCache gatewayNodeScoreCache;
/**
*
*
* @return * @return
*/ */
@Override @Override
@ -47,4 +68,146 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId); GatewayNodeInfo gatewayNodeInfo = gatewayNodeCache.get(loadNodeId);
return gatewayNodeInfo.getPublicAddress(); return gatewayNodeInfo.getPublicAddress();
} }
@Override
public void scheduleECS() {
//客户端初始化
OkHttpClient client = new OkHttpClient();
ArrayList<String> ipCacheSet = new ArrayList<>();
//从redis获取服务器IP集合
Map<Object, Double> map = gatewayZSetNodeCache.get();
for (Map.Entry<Object, Double> entry : map.entrySet()) {
ipCacheSet.add(entry.getKey().toString());
}
log.info("共有:[{}]个服务器", ipCacheSet.size());
//遍历每台服务器进行负载检查
for (String ip : ipCacheSet) {
//构建请求URI和请求头
String URL = "http://" + ip + ":8080/public/cluster";
Request request = new Request.Builder()
.url(URL)
.get()
.addHeader("User-Agent", "Apifox/1.0.0 (https://apifox.com)")
.addHeader("Accesstoken", "")
.build();
try {
Response response = client.newCall(request).execute();
//解析响应数据
JSONArray jsonArray = JSONArray.parseArray(response.body().string());
JSONObject jsonObject = jsonArray.getJSONObject(0);
JSONObject mqttInfo = jsonObject.getJSONObject("cartest");
int connectSize = mqttInfo.getIntValue("connectSize");
//更新redis中服务器的连接数zset数据类型
log.info("服务器:[{}]:车辆连接数:[{}]", ip, connectSize);
gatewayZSetNodeCache.put(ip, connectSize);
// 根据连接数判断是否需要进行扩容或缩容
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
*
*/
@Override
public void refresh() {
Long nodeMaxNum = 80L;
List<WorkGatewayNode> workGatewayNodes = gatewayNodeScoreCache.get();
//上线最大数量
Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum();
//当前连接数
Long vehicleNowOnlineNum = gatewayNodeScoreCache.getNodeNowNum();
//空余连接数
Long vehicleOnlineNum = vehicleMaxOnlineNum - vehicleNowOnlineNum;
List<String> loadNodeList =new ArrayList<>();
List<WorkGatewayNodeSource> workGatewayNodeSources = workGatewayNodes.stream()
.map(workGatewayNode ->
WorkGatewayNodeSource.builder()
.nodeId(workGatewayNode.getNodeId())
.weight(Integer.parseInt(String.valueOf(vehicleOnlineNum / (nodeMaxNum - workGatewayNode.getSource()))))
.build())
.toList();
// 计算节点列表中所有节点的权重之和
long count = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum();
// 如果权重之和小于100则对节点列表按照权重进行降序排序并将剩余权重平均分配给前几个节点直到权重之和等于100
if (count < 100) {
// 对节点按权重降序排序
List<WorkGatewayNodeSource> list = workGatewayNodeSources.stream()
.sorted((o1, o2) -> o2.getWeight() - o1.getWeight())
.toList();
// 打印排序后的节点列表
System.out.println(list);
// 将剩余权重分配给最低权重的节点直到总和达到100
int countWeight = 0;
for (long i = count; i < 100; i++) {
WorkGatewayNodeSource workGatewayNodeSource = list.get(countWeight++ % list.size());
workGatewayNodeSource.setWeight(workGatewayNodeSource.getWeight() + 1);
}
}
// 当所有节点权重为0时跳出循环
whFor:while (true) {
// 遍历节点列表将权重大于0的节点ID添加到loadNodeList中并将节点权重减1
for (WorkGatewayNodeSource workGatewayNodeSource : workGatewayNodeSources) {
int weight = workGatewayNodeSource.getWeight();
if (weight > 0) {
loadNodeList.add(
workGatewayNodeSource.getNodeId()
);
workGatewayNodeSource.setWeight(weight - 1);
}
}
int sum = workGatewayNodeSources.stream().mapToInt(WorkGatewayNodeSource::getWeight).sum();
// 如果权重之和小于等于0跳出循环
if (sum <= 0) {
break whFor;
}
}
gatewayLoadSeriesCache.reset();
gatewayLoadNodeCache.put(loadNodeList);
}
/**
* ECS
*/
public void dynamicEcs(){
//上线最大数量
Long vehicleMaxOnlineNum = gatewayNodeScoreCache.getNodeMaxOnlineNum();
//当前连接数
Long vehicleOnlineNowNum = gatewayNodeScoreCache.getNodeNowNum();
BigDecimal loadRate = new BigDecimal(vehicleMaxOnlineNum).divide(new BigDecimal(vehicleOnlineNowNum), 0, BigDecimal.ROUND_HALF_UP);
if(loadRate.longValue()>80){
//调用扩容逻辑
}else if (loadRate.longValue()<20){
//调用缩容逻辑
}
}
/**
*
* TODO
*/
/**
*
* TODO
*/
} }