From ab68985bd4c444abd43c423599227c3890ff3691 Mon Sep 17 00:00:00 2001 From: Saisai Liu <1374434128@qq.com> Date: Wed, 29 May 2024 22:43:59 +0800 Subject: [PATCH] =?UTF-8?q?feat():=E8=8E=B7=E5=8F=96=E4=BF=A1=E6=81=AF=20v?= =?UTF-8?q?ersion-1():=E8=8E=B7=E5=8F=96fluxmq=E4=B8=AD=E7=9A=84=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=EF=BC=8C=E6=89=80=E6=9C=89=E4=BF=A1=E6=81=AF=E4=B9=8B?= =?UTF-8?q?=E5=90=8E=E5=8F=AF=E4=BB=A5=E9=80=9A=E8=BF=87get=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E5=85=B6=E4=BB=96=E4=BF=A1=E6=81=AF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/mobai/TrainLoadApplication.java | 3 + .../controller/FluxGetInfoController.java | 2 +- src/main/java/com/mobai/demo/Demo1.java | 112 ++++++++++++++++ .../com/mobai/domain/MqttConnectState.java | 13 ++ src/main/java/com/mobai/domain/Result.java | 2 +- .../java/com/mobai/mq/rabbitmq/Custom.java | 22 ++-- .../com/mobai/openApi/SelectInstences.java | 121 ++++++++++++++++-- .../com/mobai/service/FluxGetInfoService.java | 2 +- .../service/impl/FluxGetInfoServiceImpl.java | 11 +- 9 files changed, 262 insertions(+), 26 deletions(-) create mode 100644 src/main/java/com/mobai/demo/Demo1.java create mode 100644 src/main/java/com/mobai/domain/MqttConnectState.java diff --git a/src/main/java/com/mobai/TrainLoadApplication.java b/src/main/java/com/mobai/TrainLoadApplication.java index 5210c93..f0c179a 100644 --- a/src/main/java/com/mobai/TrainLoadApplication.java +++ b/src/main/java/com/mobai/TrainLoadApplication.java @@ -3,9 +3,12 @@ package com.mobai; import lombok.extern.log4j.Log4j2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; @Log4j2 @SpringBootApplication +@EnableScheduling public class TrainLoadApplication { public static void main(String[] args) { SpringApplication.run(TrainLoadApplication.class); diff --git a/src/main/java/com/mobai/controller/FluxGetInfoController.java b/src/main/java/com/mobai/controller/FluxGetInfoController.java index bcdd6aa..363ad88 100644 --- a/src/main/java/com/mobai/controller/FluxGetInfoController.java +++ b/src/main/java/com/mobai/controller/FluxGetInfoController.java @@ -23,6 +23,6 @@ public class FluxGetInfoController { @GetMapping("getInfo") public Result getInfo(){ - return fluxGetInfoService.getInfo(); + return fluxGetInfoService.getInfo(null); } } diff --git a/src/main/java/com/mobai/demo/Demo1.java b/src/main/java/com/mobai/demo/Demo1.java new file mode 100644 index 0000000..2929eac --- /dev/null +++ b/src/main/java/com/mobai/demo/Demo1.java @@ -0,0 +1,112 @@ +package com.mobai.demo; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.DoubleStream; + +/** + * @ClassName demo_01 + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/5/27 14:44 + */ +public class Demo1 { + /** + * 主函数示例,展示了如何从一个HashMap中提取信息, + * 计算权重总和,按权重分配特定数量的节点,并进行相应的排序和归约。 + * + * @param args 命令行参数(未使用) + */ + public static void main(String[] args) { + // 初始化一个HashMap,存储节点ID及其对应的权重 + HashMap< String, Double > nodeMap = new HashMap<> ( ){{ + put ( "node_1",0.35 ); + put ( "node_2",0.53 ); + put ( "node_3",0.17 ); + put ( "node_4",0.46 ); + }}; + + // 将节点Map转换为NodeInfo对象的List + List< NodeInfo > nodeInfoList = nodeMap.entrySet ( ) + .stream ( ).map ( NodeInfo::new ) + .toList ( ); + System.out.println (nodeInfoList ); + + // 计算所有节点权重的总和 + double sum = nodeInfoList.stream ( ) + .flatMapToDouble ( nodeInfo -> DoubleStream.of ( nodeInfo.getWeight ( ) ) ) + .sum ( ); + System.out.println (sum ); + + // 根据权重总和计算每个节点的特定比例 + BigDecimal spec = new BigDecimal ( 100 ).divide ( new BigDecimal ( sum ), 2, BigDecimal.ROUND_HALF_UP ); + + + // 使用计算出的比例,为每个节点分配整数数量 + Map< String, Integer > collect = nodeInfoList.stream ( ).collect ( Collectors.toMap ( NodeInfo::getId, + nodeInfo -> spec.multiply ( BigDecimal.valueOf ( nodeInfo.getWeight ( ) ) ).setScale ( 0, RoundingMode.DOWN ).intValue (), + (o1, o2) -> o1 ) ); + System.out.println (collect ); + + // 计算已分配节点数量的总和 + int reduce = collect.values ( ).stream ( ).reduce ( 0, Integer::sum ); + System.out.println (reduce ); + + // 计算剩余的未分配节点数量 + int free=100-reduce; + + // 为剩余的未分配节点分配数量 + nodeInfoList.stream () + .sorted ( Comparator.comparingInt ( o->o.getWeight ().intValue () ) ) + .limit ( free ) + .forEach ( nodeInfo -> { + collect.put ( nodeInfo.getId (),collect.get ( nodeInfo.getId ())+1 ); + } ); + System.out.println (collect ); + + // 准备一个字符串数组,以存储最终的节点分配结果 + String[] nodeArray = new String[100]; + int intArrCursor= -1; + + // 将分配了数量的节点填充到字符串数组中,直到数组满或所有节点都已处理完毕 + while (collect.size ()>1){ + Iterator< Map.Entry< String, Integer > > specMapIterator = collect.entrySet ( ).iterator ( ); + while (specMapIterator.hasNext ()){ + Map.Entry< String, Integer > entry = specMapIterator.next ( ); + Integer value = entry.getValue ( ); + if (value>0){ + nodeArray[++intArrCursor] = entry.getKey ( ); + entry.setValue ( value-1 ); + } + if (value==0){ + specMapIterator.remove (); + } + } + } + System.out.println (collect ); + System.out.println (Arrays.toString ( nodeArray ) ); + } +} +class NodeInfo{ + private String id; + private Double weight; + + public NodeInfo (String id, Double weight){ + this.id=id; + this.weight=weight; + } + + public NodeInfo (Map.Entry entry){ + this.id=entry.getKey (); + this.weight=entry.getValue ()*100; + } + + public String getId(){ + return id; + } + public Double getWeight(){ + return weight; + } +} diff --git a/src/main/java/com/mobai/domain/MqttConnectState.java b/src/main/java/com/mobai/domain/MqttConnectState.java new file mode 100644 index 0000000..2307dfb --- /dev/null +++ b/src/main/java/com/mobai/domain/MqttConnectState.java @@ -0,0 +1,13 @@ +package com.mobai.domain; + +import lombok.Data; + +/** + * @ClassName MqttConnectState + * @Description 描述 + * @Author SaiSai.Liu + * @Date 2024/5/29 10:39 + */ +@Data +public class MqttConnectState { +} diff --git a/src/main/java/com/mobai/domain/Result.java b/src/main/java/com/mobai/domain/Result.java index 8ffe5ae..5af41e8 100644 --- a/src/main/java/com/mobai/domain/Result.java +++ b/src/main/java/com/mobai/domain/Result.java @@ -10,7 +10,7 @@ import java.io.Serializable; /** * 响应信息主体 * - * @author muyu + * @author mobai */ @Data @Builder diff --git a/src/main/java/com/mobai/mq/rabbitmq/Custom.java b/src/main/java/com/mobai/mq/rabbitmq/Custom.java index b8d98c2..b60dbb0 100644 --- a/src/main/java/com/mobai/mq/rabbitmq/Custom.java +++ b/src/main/java/com/mobai/mq/rabbitmq/Custom.java @@ -20,16 +20,16 @@ import java.io.IOException; @Component public class Custom { - @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "send_sms_queue"), - exchange = @Exchange(value = "null",type = ExchangeTypes.DIRECT))) - public void mqCustom(Object data, Message message, Channel channel){ - System.out.println(data.toString()); - try { - channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } +// @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "send_sms_queue"), +// exchange = @Exchange(value = "null",type = ExchangeTypes.DIRECT))) +// public void mqCustom(Object data, Message message, Channel channel){ +// System.out.println(data.toString()); +// try { +// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// +// } } diff --git a/src/main/java/com/mobai/openApi/SelectInstences.java b/src/main/java/com/mobai/openApi/SelectInstences.java index 4813124..f1637ee 100644 --- a/src/main/java/com/mobai/openApi/SelectInstences.java +++ b/src/main/java/com/mobai/openApi/SelectInstences.java @@ -5,14 +5,36 @@ package com.mobai.openApi; import com.aliyun.ecs20140526.models.DescribeInstancesResponse; import com.aliyun.ecs20140526.models.DescribeInstancesResponseBody; import com.aliyun.tea.TeaException; +import com.mobai.domain.ApifoxModel; +import com.mobai.domain.Result; +import com.mobai.service.FluxGetInfoService; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Indexed; +import reactor.core.publisher.Flux; -import java.util.ArrayList; -import java.util.List; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.*; +import java.util.stream.Collector; +import java.util.stream.Collectors; @Log4j2 +@Component public class SelectInstences { + @Autowired + private FluxGetInfoService fluxGetInfoService; + + @Autowired + private RedisTemplate redisTemplate; /** * 使用AK&SK初始化账号Client * @@ -32,9 +54,9 @@ public class SelectInstences { return new com.aliyun.ecs20140526.Client(config); } - public static void main(String[] args_) throws Exception { + @Scheduled(cron = "0/10 * * * * ? ") + public void saveIps() throws Exception { List ips = new ArrayList<>(); - List args = java.util.Arrays.asList(args_); com.aliyun.ecs20140526.Client client = SelectInstences.createClient(); com.aliyun.ecs20140526.models.DescribeInstancesRequest describeInstancesRequest = new com.aliyun.ecs20140526.models.DescribeInstancesRequest() // .setImageId("m-8vb8qnidv34yj3nbirhc") @@ -60,23 +82,104 @@ public class SelectInstences { } catch (TeaException error) { // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 // 错误 message - System.out.println(error.getMessage()); + log.error("错误信息::{}",error.getMessage()); // 诊断地址 - System.out.println(error.getData().get("Recommend")); + log.warn("诊断地址::{}",error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } catch (Exception _error) { TeaException error = new TeaException(_error.getMessage(), _error); // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 // 错误 message - System.out.println(error.getMessage()); + log.error("错误信息::{}",error.getMessage()); // 诊断地址 if (error.getData() == null) { - System.out.println("error.getData()为空"); + log.error("错误信息::{}","error.getData()为空"); } else { - System.out.println(error.getData().get("Recommend")); + log.error("诊断地址::{}",error.getData().get("Recommend")); } com.aliyun.teautil.Common.assertAsString(error.message); } + List nodes = new ArrayList<>(); + for (String ip : ips) { + Result info = fluxGetInfoService.getInfo(ip); + //连接总数 + long connectSize = info.getData().getMqttInfo().getConnectSize(); + log.info("{}::{}",ip,connectSize); + //添加到一个容器 + nodes.add(new SmallNode(ip,connectSize)); + } + //负载均横方法 + this.getArithmetic(nodes); + } + + /** + * 负载均衡算法 + * + * @param nodes + */ + private void getArithmetic(List nodes) { + // ip num 数量的容器 + Map arithmet = new HashMap<>(); + BigDecimal sum = new BigDecimal(0); + //获取所有的 键 + for (SmallNode node : nodes) { + BigDecimal value = BigDecimal.valueOf(80 - node.getNum()); + //获取总量 + sum = sum.add(value); + arithmet.put(node.getIp(),value); + }// 根据权重总和计算每个节点的特定比例 + System.out.println (arithmet ); + log.info("总可负载量:{}",arithmet); + List ips = new ArrayList<>(); + //获取每个ip的分配率 + for (String ip : arithmet.keySet()) { + //概率 + BigDecimal probability = arithmet.get(ip).divide(sum,4,BigDecimal.ROUND_HALF_UP).multiply(BigDecimal.valueOf(100)); + arithmet.put(ip,probability); + log.info(ip+"可负载率(权重值):{}",probability); + } + Set ipSet = arithmet.keySet(); + + BigDecimal finalSum = sum; + Map map = new HashMap<>(); + // 转换成数量 + ipSet.forEach(ip -> + map.put(ip,arithmet.get(ip).multiply(finalSum).intValue()/100) + ); + Long i = 0L; + System.out.println(map); + while (true){ + ipSet = map.keySet(); + Iterator iterator = ipSet.iterator(); + i++; + while (iterator.hasNext()){ + i++; + String ip = iterator.next(); + ips.add(ip); + int i1 = map.get(ip) - 1; + map.put(ip,i1); + if (map.get(ip).equals(0)){ + map.remove(ip); + } + } + if (i.equals(finalSum)){ + break; + } + System.out.println(i); + } + + // 可负载IP轮询排列 + System.out.println(arithmet); + System.out.println(ips); + } + + + @Data + @AllArgsConstructor + @NoArgsConstructor + class SmallNode{ + private String ip; + private long num; } } diff --git a/src/main/java/com/mobai/service/FluxGetInfoService.java b/src/main/java/com/mobai/service/FluxGetInfoService.java index 206c4bb..88d0704 100644 --- a/src/main/java/com/mobai/service/FluxGetInfoService.java +++ b/src/main/java/com/mobai/service/FluxGetInfoService.java @@ -10,6 +10,6 @@ import com.mobai.domain.Result; */ public interface FluxGetInfoService { - Result getInfo(); + Result getInfo(String ip); } diff --git a/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java b/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java index 2ac8798..441aa3f 100644 --- a/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java +++ b/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java @@ -34,8 +34,13 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService { private RestTemplate restTemplate; @Override - public Result getInfo() { - String url = "http://39.98.50.223:8080/public/"; + public Result getInfo(String ip) { + String url = null; + if (ip==null){ + url= "http://39.98.50.223:8080/public/"; + }else { + url = "http://"+ip+":8080/public/"; + } User user = new User("fluxmq", "fluxmq"); //登录 AcceptToken token = restTemplate.postForObject(url+"login", user, AcceptToken.class); @@ -51,6 +56,6 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService { List apifoxModel = JSON.parseArray(exchange.getBody(), ApifoxModel.class); // get 获取具体所有信息 - return Result.success(apifoxModel); + return Result.success(apifoxModel.get(0)); } }