parent
e2d9adbffa
commit
ab68985bd4
|
@ -3,9 +3,12 @@ package com.mobai;
|
|||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
@Log4j2
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
public class TrainLoadApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(TrainLoadApplication.class);
|
||||
|
|
|
@ -23,6 +23,6 @@ public class FluxGetInfoController {
|
|||
|
||||
@GetMapping("getInfo")
|
||||
public Result getInfo(){
|
||||
return fluxGetInfoService.getInfo();
|
||||
return fluxGetInfoService.getInfo(null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
package com.mobai.demo;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.DoubleStream;
|
||||
|
||||
/**
|
||||
* @ClassName demo_01
|
||||
* @Description 描述
|
||||
* @Author YunFei.Du
|
||||
* @Date 2024/5/27 14:44
|
||||
*/
|
||||
public class Demo1 {
|
||||
/**
|
||||
* 主函数示例,展示了如何从一个HashMap中提取信息,
|
||||
* 计算权重总和,按权重分配特定数量的节点,并进行相应的排序和归约。
|
||||
*
|
||||
* @param args 命令行参数(未使用)
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
// 初始化一个HashMap,存储节点ID及其对应的权重
|
||||
HashMap< String, Double > nodeMap = new HashMap<> ( ){{
|
||||
put ( "node_1",0.35 );
|
||||
put ( "node_2",0.53 );
|
||||
put ( "node_3",0.17 );
|
||||
put ( "node_4",0.46 );
|
||||
}};
|
||||
|
||||
// 将节点Map转换为NodeInfo对象的List
|
||||
List< NodeInfo > nodeInfoList = nodeMap.entrySet ( )
|
||||
.stream ( ).map ( NodeInfo::new )
|
||||
.toList ( );
|
||||
System.out.println (nodeInfoList );
|
||||
|
||||
// 计算所有节点权重的总和
|
||||
double sum = nodeInfoList.stream ( )
|
||||
.flatMapToDouble ( nodeInfo -> DoubleStream.of ( nodeInfo.getWeight ( ) ) )
|
||||
.sum ( );
|
||||
System.out.println (sum );
|
||||
|
||||
// 根据权重总和计算每个节点的特定比例
|
||||
BigDecimal spec = new BigDecimal ( 100 ).divide ( new BigDecimal ( sum ), 2, BigDecimal.ROUND_HALF_UP );
|
||||
|
||||
|
||||
// 使用计算出的比例,为每个节点分配整数数量
|
||||
Map< String, Integer > collect = nodeInfoList.stream ( ).collect ( Collectors.toMap ( NodeInfo::getId,
|
||||
nodeInfo -> spec.multiply ( BigDecimal.valueOf ( nodeInfo.getWeight ( ) ) ).setScale ( 0, RoundingMode.DOWN ).intValue (),
|
||||
(o1, o2) -> o1 ) );
|
||||
System.out.println (collect );
|
||||
|
||||
// 计算已分配节点数量的总和
|
||||
int reduce = collect.values ( ).stream ( ).reduce ( 0, Integer::sum );
|
||||
System.out.println (reduce );
|
||||
|
||||
// 计算剩余的未分配节点数量
|
||||
int free=100-reduce;
|
||||
|
||||
// 为剩余的未分配节点分配数量
|
||||
nodeInfoList.stream ()
|
||||
.sorted ( Comparator.comparingInt ( o->o.getWeight ().intValue () ) )
|
||||
.limit ( free )
|
||||
.forEach ( nodeInfo -> {
|
||||
collect.put ( nodeInfo.getId (),collect.get ( nodeInfo.getId ())+1 );
|
||||
} );
|
||||
System.out.println (collect );
|
||||
|
||||
// 准备一个字符串数组,以存储最终的节点分配结果
|
||||
String[] nodeArray = new String[100];
|
||||
int intArrCursor= -1;
|
||||
|
||||
// 将分配了数量的节点填充到字符串数组中,直到数组满或所有节点都已处理完毕
|
||||
while (collect.size ()>1){
|
||||
Iterator< Map.Entry< String, Integer > > specMapIterator = collect.entrySet ( ).iterator ( );
|
||||
while (specMapIterator.hasNext ()){
|
||||
Map.Entry< String, Integer > entry = specMapIterator.next ( );
|
||||
Integer value = entry.getValue ( );
|
||||
if (value>0){
|
||||
nodeArray[++intArrCursor] = entry.getKey ( );
|
||||
entry.setValue ( value-1 );
|
||||
}
|
||||
if (value==0){
|
||||
specMapIterator.remove ();
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println (collect );
|
||||
System.out.println (Arrays.toString ( nodeArray ) );
|
||||
}
|
||||
}
|
||||
class NodeInfo{
|
||||
private String id;
|
||||
private Double weight;
|
||||
|
||||
public NodeInfo (String id, Double weight){
|
||||
this.id=id;
|
||||
this.weight=weight;
|
||||
}
|
||||
|
||||
public NodeInfo (Map.Entry<String,Double> entry){
|
||||
this.id=entry.getKey ();
|
||||
this.weight=entry.getValue ()*100;
|
||||
}
|
||||
|
||||
public String getId(){
|
||||
return id;
|
||||
}
|
||||
public Double getWeight(){
|
||||
return weight;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package com.mobai.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @ClassName MqttConnectState
|
||||
* @Description 描述
|
||||
* @Author SaiSai.Liu
|
||||
* @Date 2024/5/29 10:39
|
||||
*/
|
||||
@Data
|
||||
public class MqttConnectState {
|
||||
}
|
|
@ -10,7 +10,7 @@ import java.io.Serializable;
|
|||
/**
|
||||
* 响应信息主体
|
||||
*
|
||||
* @author muyu
|
||||
* @author mobai
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
|
|
|
@ -20,16 +20,16 @@ import java.io.IOException;
|
|||
@Component
|
||||
public class Custom {
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "send_sms_queue"),
|
||||
exchange = @Exchange(value = "null",type = ExchangeTypes.DIRECT)))
|
||||
public void mqCustom(Object data, Message message, Channel channel){
|
||||
System.out.println(data.toString());
|
||||
try {
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
// @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "send_sms_queue"),
|
||||
// exchange = @Exchange(value = "null",type = ExchangeTypes.DIRECT)))
|
||||
// public void mqCustom(Object data, Message message, Channel channel){
|
||||
// System.out.println(data.toString());
|
||||
// try {
|
||||
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
||||
// } catch (IOException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
//
|
||||
// }
|
||||
|
||||
}
|
||||
|
|
|
@ -5,14 +5,36 @@ package com.mobai.openApi;
|
|||
import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
|
||||
import com.aliyun.ecs20140526.models.DescribeInstancesResponseBody;
|
||||
import com.aliyun.tea.TeaException;
|
||||
import com.mobai.domain.ApifoxModel;
|
||||
import com.mobai.domain.Result;
|
||||
import com.mobai.service.FluxGetInfoService;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Indexed;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collector;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
public class SelectInstences {
|
||||
|
||||
@Autowired
|
||||
private FluxGetInfoService fluxGetInfoService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String,String> redisTemplate;
|
||||
/**
|
||||
* 使用AK&SK初始化账号Client
|
||||
*
|
||||
|
@ -32,9 +54,9 @@ public class SelectInstences {
|
|||
return new com.aliyun.ecs20140526.Client(config);
|
||||
}
|
||||
|
||||
public static void main(String[] args_) throws Exception {
|
||||
@Scheduled(cron = "0/10 * * * * ? ")
|
||||
public void saveIps() throws Exception {
|
||||
List<String> ips = new ArrayList<>();
|
||||
List<String> args = java.util.Arrays.asList(args_);
|
||||
com.aliyun.ecs20140526.Client client = SelectInstences.createClient();
|
||||
com.aliyun.ecs20140526.models.DescribeInstancesRequest describeInstancesRequest = new com.aliyun.ecs20140526.models.DescribeInstancesRequest()
|
||||
// .setImageId("m-8vb8qnidv34yj3nbirhc")
|
||||
|
@ -60,23 +82,104 @@ public class SelectInstences {
|
|||
} catch (TeaException error) {
|
||||
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
|
||||
// 错误 message
|
||||
System.out.println(error.getMessage());
|
||||
log.error("错误信息::{}",error.getMessage());
|
||||
// 诊断地址
|
||||
System.out.println(error.getData().get("Recommend"));
|
||||
log.warn("诊断地址::{}",error.getData().get("Recommend"));
|
||||
com.aliyun.teautil.Common.assertAsString(error.message);
|
||||
} catch (Exception _error) {
|
||||
TeaException error = new TeaException(_error.getMessage(), _error);
|
||||
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
|
||||
// 错误 message
|
||||
System.out.println(error.getMessage());
|
||||
log.error("错误信息::{}",error.getMessage());
|
||||
// 诊断地址
|
||||
if (error.getData() == null) {
|
||||
System.out.println("error.getData()为空");
|
||||
log.error("错误信息::{}","error.getData()为空");
|
||||
} else {
|
||||
System.out.println(error.getData().get("Recommend"));
|
||||
log.error("诊断地址::{}",error.getData().get("Recommend"));
|
||||
}
|
||||
|
||||
com.aliyun.teautil.Common.assertAsString(error.message);
|
||||
}
|
||||
List<SmallNode> nodes = new ArrayList<>();
|
||||
for (String ip : ips) {
|
||||
Result<ApifoxModel> info = fluxGetInfoService.getInfo(ip);
|
||||
//连接总数
|
||||
long connectSize = info.getData().getMqttInfo().getConnectSize();
|
||||
log.info("{}::{}",ip,connectSize);
|
||||
//添加到一个容器
|
||||
nodes.add(new SmallNode(ip,connectSize));
|
||||
}
|
||||
//负载均横方法
|
||||
this.getArithmetic(nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* 负载均衡算法
|
||||
*
|
||||
* @param nodes
|
||||
*/
|
||||
private void getArithmetic(List<SmallNode> nodes) {
|
||||
// ip num 数量的容器
|
||||
Map<String, BigDecimal> arithmet = new HashMap<>();
|
||||
BigDecimal sum = new BigDecimal(0);
|
||||
//获取所有的 键
|
||||
for (SmallNode node : nodes) {
|
||||
BigDecimal value = BigDecimal.valueOf(80 - node.getNum());
|
||||
//获取总量
|
||||
sum = sum.add(value);
|
||||
arithmet.put(node.getIp(),value);
|
||||
}// 根据权重总和计算每个节点的特定比例
|
||||
System.out.println (arithmet );
|
||||
log.info("总可负载量:{}",arithmet);
|
||||
List<String> ips = new ArrayList<>();
|
||||
//获取每个ip的分配率
|
||||
for (String ip : arithmet.keySet()) {
|
||||
//概率
|
||||
BigDecimal probability = arithmet.get(ip).divide(sum,4,BigDecimal.ROUND_HALF_UP).multiply(BigDecimal.valueOf(100));
|
||||
arithmet.put(ip,probability);
|
||||
log.info(ip+"可负载率(权重值):{}",probability);
|
||||
}
|
||||
Set<String> ipSet = arithmet.keySet();
|
||||
|
||||
BigDecimal finalSum = sum;
|
||||
Map<String, Integer> map = new HashMap<>();
|
||||
// 转换成数量
|
||||
ipSet.forEach(ip ->
|
||||
map.put(ip,arithmet.get(ip).multiply(finalSum).intValue()/100)
|
||||
);
|
||||
Long i = 0L;
|
||||
System.out.println(map);
|
||||
while (true){
|
||||
ipSet = map.keySet();
|
||||
Iterator<String> iterator = ipSet.iterator();
|
||||
i++;
|
||||
while (iterator.hasNext()){
|
||||
i++;
|
||||
String ip = iterator.next();
|
||||
ips.add(ip);
|
||||
int i1 = map.get(ip) - 1;
|
||||
map.put(ip,i1);
|
||||
if (map.get(ip).equals(0)){
|
||||
map.remove(ip);
|
||||
}
|
||||
}
|
||||
if (i.equals(finalSum)){
|
||||
break;
|
||||
}
|
||||
System.out.println(i);
|
||||
}
|
||||
|
||||
// 可负载IP轮询排列
|
||||
System.out.println(arithmet);
|
||||
System.out.println(ips);
|
||||
}
|
||||
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
class SmallNode{
|
||||
private String ip;
|
||||
private long num;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,6 @@ import com.mobai.domain.Result;
|
|||
*/
|
||||
public interface FluxGetInfoService {
|
||||
|
||||
Result getInfo();
|
||||
Result getInfo(String ip);
|
||||
|
||||
}
|
||||
|
|
|
@ -34,8 +34,13 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
|
|||
private RestTemplate restTemplate;
|
||||
|
||||
@Override
|
||||
public Result getInfo() {
|
||||
String url = "http://39.98.50.223:8080/public/";
|
||||
public Result getInfo(String ip) {
|
||||
String url = null;
|
||||
if (ip==null){
|
||||
url= "http://39.98.50.223:8080/public/";
|
||||
}else {
|
||||
url = "http://"+ip+":8080/public/";
|
||||
}
|
||||
User user = new User("fluxmq", "fluxmq");
|
||||
//登录
|
||||
AcceptToken token = restTemplate.postForObject(url+"login", user, AcceptToken.class);
|
||||
|
@ -51,6 +56,6 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
|
|||
List<ApifoxModel> apifoxModel = JSON.parseArray(exchange.getBody(), ApifoxModel.class);
|
||||
// get 获取具体所有信息
|
||||
|
||||
return Result.success(apifoxModel);
|
||||
return Result.success(apifoxModel.get(0));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue