master
肖凡 2024-04-21 21:41:46 +08:00
parent a5dbdf2457
commit bb1a3cabd3
11 changed files with 158 additions and 22 deletions

View File

@ -1,13 +1,13 @@
package com.xiaofan.loadcenter.common.aliyun;
package com.xiaofan.loadcenter.aliyun;
import com.aliyun.ecs20140526.Client;
import com.aliyun.ecs20140526.models.*;
import com.aliyun.tea.TeaException;
import com.aliyun.teautil.models.RuntimeOptions;
import com.xiaofan.loadcenter.common.aliyun.config.AliConfig;
import com.xiaofan.loadcenter.aliyun.config.InstanceConfig;
import com.xiaofan.loadcenter.aliyun.config.AliConfig;
import com.xiaofan.loadcenter.common.aliyun.utils.UserUtil;
import com.xiaofan.loadcenter.common.aliyun.config.InstanceConfig;
import com.xiaofan.loadcenter.common.utils.UserUtil;
import com.xiaofan.loadcenter.common.domain.EcsInstanceInfo;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;

View File

@ -1,4 +1,4 @@
package com.xiaofan.loadcenter.common.aliyun.config;
package com.xiaofan.loadcenter.aliyun.config;
import com.aliyun.ecs20140526.Client;
import com.aliyun.teaopenapi.models.Config;
@ -11,7 +11,7 @@ import org.springframework.context.annotation.Configuration;
* @ProjectName: LoadCenter
* @PackageName: com.muyu.loadCenter.aliyun.config
* @Description
* @Author HuangDaJu
* @Author XiaoFan
* @Date 2024/4/16 14:48
* @Version 1.0
*/

View File

@ -1,4 +1,4 @@
package com.xiaofan.loadcenter.common.aliyun.config;
package com.xiaofan.loadcenter.aliyun.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

View File

@ -1,4 +1,4 @@
package com.xiaofan.loadcenter.common.aliyun.config;
package com.xiaofan.loadcenter.aliyun.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

View File

@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
* @ProjectName: LoadCenter
* @PackageName: com.muyu.loadCenter.domain
* @Description TODO
* @Author HuangDaJu
* @Author XiaoFan
* @Date 2024/4/14 11:55
* @Version 1.0
*/

View File

@ -1,7 +1,7 @@
package com.xiaofan.loadcenter.common.aliyun.utils;
package com.xiaofan.loadcenter.common.utils;
import com.xiaofan.loadcenter.common.aliyun.utils.uuid.IdUtils;
import com.xiaofan.loadcenter.common.utils.uuid.IdUtils;
import com.xiaofan.loadcenter.common.redis.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

View File

@ -1,4 +1,4 @@
package com.xiaofan.loadcenter.common.aliyun.utils.uuid;
package com.xiaofan.loadcenter.common.utils.uuid;
/**
* ID

View File

@ -1,4 +1,4 @@
package com.xiaofan.loadcenter.common.aliyun.utils.uuid;
package com.xiaofan.loadcenter.common.utils.uuid;
import java.security.MessageDigest;

View File

@ -1,4 +1,22 @@
package com.xiaofan.loadcenter.gateway.mq;
import com.rabbitmq.client.Channel;
import com.xiaofan.loadcenter.aliyun.AliYunEcsService;
import com.xiaofan.loadcenter.common.domain.Result;
import com.xiaofan.loadcenter.common.redis.service.RedisService;
import com.xiaofan.loadcenter.gateway.cache.GatewayNodeCache;
import com.xiaofan.loadcenter.gateway.cache.GatewayNodeSetVinCache;
import com.xiaofan.loadcenter.gateway.cache.GatewayVehicleLineNodeCache;
import com.xiaofan.loadcenter.gateway.model.GatewayNodeInfo;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
/**
* @ProjectName: LoadCenter
@ -8,6 +26,115 @@ package com.xiaofan.loadcenter.gateway.mq;
* @Date 2024/4/20 15:23
* @Version 1.0
*/
@RabbitListener(queues = "queue")
@Component
@Log4j2
public class Consumer {
@Autowired
private RedisService redisService;
@Autowired
private RestTemplate restTemplate;
@Autowired
private GatewayNodeSetVinCache gatewayNodeSetVinCache;
@Autowired
private GatewayNodeCache gatewayNodeCache;
@Autowired
private GatewayVehicleLineNodeCache gatewayVehicleLineNodeCache;
@Autowired
private AliYunEcsService aliYunEcsService;
@RabbitHandler
public void Handler(GatewayNodeInfo gatewayNodeInfo, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 消息ID和传递标签
String messageId = message.getMessageProperties().getMessageId();
log.info("消息Id"+messageId);
log.info("标签:"+deliveryTag);
// 检查消息是否首次处理
if (!redisService.hasKey("idempotent:"+messageId)){
// 首次处理,存储传递标签
redisService.setCacheObject("idempotent:"+messageId,deliveryTag+"");
}
//1 添加成功新数据 0已有重复值,不允许再添加
Long add = redisService.redisTemplate.opsForSet().add( "set" + messageId, "set" + messageId);
try {
// 检查是否重复消费
if(add == 1){
//消费的业务逻辑
// 确认消费并在Redis中做标记
log.info("确认消费");
Set<String> set=redisService.getCacheSet("gateway:node:many:"+gatewayNodeInfo.getPublicIdAddress());
ArrayList<String> allVin = new ArrayList<>(set);
// List<String> allVin = gatewayNodeSetVinCache.getNodeAllVin(gatewayNodeInfo.getPublicIdAddress());
String url="http://127.0.0.1:81/vehicle/instance/client/inits";
Result result = restTemplate.postForObject(url,allVin, Result.class);
Integer i = (Integer) result.getData();
if (i>0){
for (String string : allVin) {
gatewayNodeSetVinCache.remove(gatewayNodeInfo.getPublicIdAddress(),string);
gatewayVehicleLineNodeCache.remove(string);
}
gatewayNodeCache.remove(gatewayNodeInfo.getNodeId());
log.info("删除实例的对象");
}
aliYunEcsService.releaseECS(gatewayNodeInfo.getNodeId()); // 释放ECS实例
log.info("释放实例");
redisService.setCacheObject("repeat:"+messageId,messageId);
channel.basicAck(deliveryTag,false);
}else{
// 重复消费,拒绝处理
log.info("重复消费");
channel.basicReject(deliveryTag,false);
}
} catch (Exception exception) {
// 异常处理逻辑,根据重试次数决定消息处理方式
String s = redisService.getCacheObject("idempotent:" + messageId);
long old = Long.parseLong(s);
if (deliveryTag==(old+2)){
// 重试三次后,仍然失败,退回消息
log.info("已重试三次,实在消费不了");
channel.basicNack(deliveryTag,false,false);
}else {
// 尝试再次重试
log.info("继续重试");
channel.basicNack(deliveryTag,false,true);
}
}
}
}

View File

@ -2,7 +2,7 @@ package com.xiaofan.loadcenter.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.xiaofan.loadcenter.common.aliyun.AliYunEcsService;
import com.xiaofan.loadcenter.aliyun.AliYunEcsService;
import com.xiaofan.loadcenter.common.domain.EcsInstanceInfo;
import com.xiaofan.loadcenter.common.domain.WorkGatewayNode;
import com.xiaofan.loadcenter.common.rabbit.RabbitMqConfig;
@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* @ProjectName: LoadCenter
@ -44,11 +43,11 @@ import java.util.stream.Collectors;
public class GatewayLoadServiceImpl implements GatewayLoadService {
private final Long nodeLength=100L;
private final GatewayLoadNodeCache gatewayLoadNodeCache;
private final GatewayLoadSeriesCache gatewayLoadSeriesCache;
private final GatewayNodeCache gatewayNodeCache;
private final GatewayNodeScoreCache gatewayNodeScoreCache;
@Autowired
private RabbitTemplate rabbitTemplate;
@ -59,6 +58,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
@Autowired
private AliYunEcsService aliYunEcsService;
/**
*
* @return
@ -117,6 +117,7 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
}
@Override
public void companding() throws Exception{
//查找连接数
@ -137,12 +138,10 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
Thread.sleep(5000);
//获取新实例信息并将其放入Redis
EcsInstanceInfo ecsInstanceInfo = aliYunEcsService.selectList(instanceId);
GatewayNodeInfo gatewayNodeInfo = new GatewayNodeInfo();
gatewayNodeInfo.setNodeId(ecsInstanceInfo.getInstanceId());
gatewayNodeInfo.setPublicIdAddress(ecsInstanceInfo.getPublicIpAddress());
gatewayNodeInfo.setPrivateIdAddress(ecsInstanceInfo.getPrivateIpAddress());
//添加缓存数据
gatewayNodeCache.put(ecsInstanceInfo.getPublicIpAddress(),gatewayNodeInfo);
//修改服务器与在线车辆
@ -152,7 +151,6 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
if (nodeList.size()>2){
log.info("负载率:"+loadRate);
log.info("负载过低,开始缩容"+loadRate.longValue());
WorkGatewayNode minConnectionNode=null;
int minConnections=Integer.MAX_VALUE;
@ -181,8 +179,6 @@ public class GatewayLoadServiceImpl implements GatewayLoadService {
log.info("负载过低,但是节点数小于2,不缩容");
}
}
}
@Override

View File

@ -7,6 +7,19 @@ Spring:
port: 6379
rabbitmq:
username: guest
password: guest
virtualHost: /
port: 5672
host: 101.43.11.6
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
config:
ali:
access-key-id: LTAI5tS81pRFwvksJvm5HPjp