diff --git a/src/main/java/com/load/MqttApplication.java b/src/main/java/com/load/MqttApplication.java deleted file mode 100644 index cd1def0..0000000 --- a/src/main/java/com/load/MqttApplication.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.load; - -/** - * @ClassName MqttApplication - * @Description 描述 - * @Author YunFei.Du - * @Date 2024/5/30 9:19 - */ -public class MqttApplication { -} diff --git a/src/main/java/com/load/SubscribeSample.java b/src/main/java/com/load/SubscribeSample.java index c58e6c0..0c5d428 100644 --- a/src/main/java/com/load/SubscribeSample.java +++ b/src/main/java/com/load/SubscribeSample.java @@ -11,7 +11,7 @@ public class SubscribeSample { /** * 代理地址 */ - String broker = "tcp://39.100.87.192:1883"; + String broker = "tcp://39.100.113.246:1883"; /** * 主题 */ diff --git a/src/main/java/com/load/config/InitConnectWeight.java b/src/main/java/com/load/config/InitConnectWeight.java index 962be25..90e101a 100644 --- a/src/main/java/com/load/config/InitConnectWeight.java +++ b/src/main/java/com/load/config/InitConnectWeight.java @@ -53,90 +53,34 @@ public class InitConnectWeight implements ApplicationRunner { //获取阿里云客户端 try { client = ECSTool.createClient(); + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest().setRegionId("cn-zhangjiakou"); + DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, new RuntimeOptions()); + List> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream() + .map(instance -> instance.publicIpAddress.ipAddress) + .collect(Collectors.toList()); + ipListList.forEach(strings -> strings.forEach(ipList::add)); } catch (Exception e) { + log.error("获取阿里云客户端或实例信息失败", e); throw new RuntimeException(e); } - //获取所有实例 - DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() - .setRegionId("cn-zhangjiakou"); -// .setInstanceType("ecs.e-c1m2.xlarge"); - RuntimeOptions runtime = new RuntimeOptions(); - try { - // 复制代码运行请自行打印 API 的返回值 - DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime); - List> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList()); - for (List strings : ipListList) { - for (String ip : strings) { - ipList.add(ip); - } - } - } catch (TeaException error) { - // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 - // 错误 message - System.out.println(error.getMessage()); - // 诊断地址 - System.out.println(error.getData().get("Recommend")); - com.aliyun.teautil.Common.assertAsString(error.message); - } catch (Exception _error) { - TeaException error = new TeaException(_error.getMessage(), _error); - // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 - // 错误 message - System.out.println(error.getMessage()); - // 诊断地址 - System.out.println(error.getData().get("Recommend")); - com.aliyun.teautil.Common.assertAsString(error.message); - } - - - //http://fluxmq.muyu.icu/public/login - - //遍历所有ip,获取每一个服务的连接数 for (String ip : ipList) { - String url = "http://" + ip + ":8080/public/login"; - Map request = new HashMap<>(); - request.put("username", "fluxmq"); - request.put("password", "fluxmq"); - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.APPLICATION_JSON); - HttpEntity> r = new HttpEntity>(request, httpHeaders); - String result = restTemplate.postForObject(url, r, String.class); - - - //http://fluxmq.muyu.icu/public/cluster - - int nextInt = new Random().nextInt(1000); - String getInfoUrl = "http://" + ip + ":8080/public/cluster?random=" + nextInt; - - HttpHeaders httpHeadersGetInfo = new HttpHeaders(); - httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON); - httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); - httpHeadersGetInfo.set("Cookie", result); - HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo); - ResponseEntity responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1); - - log.error("响应是:{}", responseInfo.getBody()); - - JSONArray jsonArray = JSON.parseArray(responseInfo.getBody()); - if (jsonArray.size() > 0) { - JSONObject jsonObject = jsonArray.getJSONObject(0); - Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize")); - connectWeightList.add(new ConnectWeight(ip,100-connectSize)); - log.error("链接数量:{}", connectSize); - } else { - log.error("得到的相应数据为null"); + try { + processIp(ip, connectWeightList); + } catch (Exception e) { + log.error("处理IP [{}] 时出错", ip, e); } } +// connectWeightList 的逻辑 + // 初始化权重求和变量 Integer sum =0; // 遍历ConnectWeight列表,计算所有权重值的总和 for (ConnectWeight connectWeight : connectWeightList) { sum = sum + connectWeight.getWeightValue(); } - // 初始化最大轮询次数为0 int max = 0; for (ConnectWeight connectWeight : connectWeightList) { - log.error("权重值:{}",connectWeight.getWeightValue()); // 将权重值转换为百分比并向下取整,得到每一轮的轮询次数 Integer result = BigDecimal.valueOf(connectWeight.getWeightValue() * 100).divide(BigDecimal.valueOf(sum), 0, RoundingMode.DOWN).intValue(); // 如果计算出的轮询次数大于当前最大值,更新最大值 @@ -168,4 +112,57 @@ public class InitConnectWeight implements ApplicationRunner { redisTemplate.opsForList().rightPush("ipList",ip); } } + + private void processIp(String ip, List connectWeightList) throws Exception { + + String username = "fluxmq"; + String password = "fluxmq"; + String loginUrl = "http://" + ip + ":8080/public/login"; + String getInfoUrl = "http://" + ip + ":8080/public/cluster"; + + Map< String, Object > loginRequest = new HashMap<> ( ); + loginRequest.put ( "username", username ); + loginRequest.put ( "password", password ); + + HttpHeaders headers = new HttpHeaders ( ); + headers.setContentType ( MediaType.APPLICATION_JSON ); + HttpEntity< Map< String, Object > > loginEntity = new HttpEntity<> ( loginRequest, headers ); + + // 登录操作 + String loginResult = restTemplate.postForObject ( loginUrl, loginEntity, String.class ); + // 使用登录cookie获取集群信息 + HttpHeaders headersGetInfo = new HttpHeaders ( ); + headersGetInfo.setContentType ( MediaType.APPLICATION_JSON ); + headersGetInfo.setAccept ( Collections.singletonList ( MediaType.APPLICATION_JSON ) ); + headersGetInfo.set ( "Cookie", loginResult ); + HttpEntity< String > getInfoEntity = new HttpEntity<> ( headersGetInfo ); + + ResponseEntity< String > responseInfo = restTemplate.exchange ( getInfoUrl, HttpMethod.GET, getInfoEntity, String.class ); + + if (responseInfo.getStatusCodeValue ( ) == 200) { + JSONArray jsonArray = JSON.parseArray ( responseInfo.getBody ( ) ); + if (jsonArray.size ( ) > 0) { + JSONObject jsonObject = jsonArray.getJSONObject ( 0 ); + + log.info ( "响应 :{}", jsonObject ); + Integer connectSize = jsonObject.getJSONObject("mqttInfo").getJSONObject("runtimes").getInteger("mqtt.connect"); + ConnectWeight connectWeight = new ConnectWeight ( ip, 100 - connectSize ); + + log.info ( "IP: {}, 连接数: {}, 权重值: {}", ip, connectSize, connectWeight.getWeightValue ( ) ); + + log.info ( "Full Response: {}", responseInfo.getBody ( ) ); + connectWeightList.add ( connectWeight ); + } else { + // 处理得到的响应数据为空的情况 + log.warn ( "响应数据为空" ); + } + } else { + // 处理HTTP请求失败的情况 + int statusCode = responseInfo.getStatusCodeValue ( ); + String responseBody = responseInfo.getBody ( ); + log.error ( "Failed to fetch data from {}: Status code {} with response: {}", ip, statusCode, responseBody ); + } + } + + } diff --git a/src/main/java/com/load/consumer/ReceiveHandler.java b/src/main/java/com/load/consumer/ReceiveHandler.java index 91ef4ce..916887f 100644 --- a/src/main/java/com/load/consumer/ReceiveHandler.java +++ b/src/main/java/com/load/consumer/ReceiveHandler.java @@ -26,8 +26,8 @@ public class ReceiveHandler { @Autowired private InitConnectWeight initConnectWeight; - //监听sms队列 ADD_LOG_AAA - @RabbitListener(queues = {"ADD_LOG_AAA"}) + //监听SMS队列 ADD_LOG_AAA + @RabbitListener(queues = {"test"}) public void receiveSms(Message message) { try { initConnectWeight.run(new DefaultApplicationArguments()); diff --git a/src/main/java/com/load/mqtt/MessageCallbackService.java b/src/main/java/com/load/mqtt/MessageCallbackService.java deleted file mode 100644 index cf77090..0000000 --- a/src/main/java/com/load/mqtt/MessageCallbackService.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.load.mqtt; - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.stereotype.Service; - -/** - * 消息回调服务 (回执消息类) - * @author YunFei.Du - * @date 22:37 2024/5/30 - */ -@Service -public class MessageCallbackService implements MqttCallback { - - @Override - public void connectionLost(Throwable cause) { - System.out.println("connectionLost: " + cause.getMessage()); - } - - @Override - public void messageArrived(String topic, MqttMessage message) { - System.out.println("topic: " + topic); - System.out.println("Qos: " + message.getQos()); - System.out.println("message content: " + new String(message.getPayload())); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } -} diff --git a/src/main/java/com/load/mqtt/MqttFactory.java b/src/main/java/com/load/mqtt/MqttFactory.java deleted file mode 100644 index 86692d7..0000000 --- a/src/main/java/com/load/mqtt/MqttFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.load.mqtt; - -import lombok.AllArgsConstructor; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.stereotype.Service; - -/** - * Mqtt工厂 - * @author YunFei.Du - * @date 22:38 2024/5/30 - */ -@Service -@AllArgsConstructor -public class MqttFactory { - - private final MessageCallbackService messageCallbackService; - // 连接参数 - - /** - * 创建Mqtt 客户端 - * @param mqttProperties - * @return - */ - public MqttClient createClient(MqttProperties mqttProperties){ - MqttClient mqttClient =null; - try { - mqttClient=new MqttClient ( mqttProperties.getBroker() , mqttProperties.getClientId() , new MemoryPersistence() ); - } catch (MqttException e) { - throw new RuntimeException ( e ); - } - MqttConnectOptions options = new MqttConnectOptions ( ); - - if (mqttProperties.isLogin()){ - options.setUserName ( mqttProperties.getUsername() ); - options.setPassword ( mqttProperties.getPassword().toCharArray() ); - } - options.setConnectionTimeout(60); - options.setKeepAliveInterval(60); - mqttClient.setCallback ( messageCallbackService ); - - return mqttClient; - } -} diff --git a/src/main/java/com/load/mqtt/MqttProperties.java b/src/main/java/com/load/mqtt/MqttProperties.java deleted file mode 100644 index ee5376c..0000000 --- a/src/main/java/com/load/mqtt/MqttProperties.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.load.mqtt; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.apache.commons.lang3.StringUtils; - -/** - * 配置中心 - * @author YunFei.Du - * @date 8:53 2024/5/30 - */ -@Data -@AllArgsConstructor -@NoArgsConstructor -@Builder -public class MqttProperties { - - private String broker; - private String topic; - private String username; - private String password; - private String clientId; - - /** - * 构建mqtt配置 - * @param ip - * @param topic - * @return - */ - public static MqttProperties configBuild(String ip, String topic){ - return MqttProperties.builder() - .broker("tcp://"+ip+":1883") - .topic(topic) - .username("admin") - .password("public") - .clientId("protocol-parsing") //协议解析 定值 --> 配置 - .build(); - } - - /** - * 判断是否可以登录 - * @return - */ - public boolean isLogin(){ -// commons-lang3 - return StringUtils.isBlank ( username ) && !StringUtils.isBlank ( password ); - } - -} diff --git a/src/main/java/com/load/rebbitmq/MsgHandle.java b/src/main/java/com/load/rebbitmq/MsgHandle.java deleted file mode 100644 index d48999d..0000000 --- a/src/main/java/com/load/rebbitmq/MsgHandle.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.load.rebbitmq; - -import com.load.mqtt.MqttFactory; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * 信息处理器 - * @author YunFei.Du - * @date 9:21 2024/5/30 - */ -@Component -@Log4j2 -public class MsgHandle { - - @Autowired - private MqttFactory mqttFactory; -// @RabbitListener(queues = {"create.topic"}) -// private void msg(String msg){ -// log.info ( "接收到消息:{}" , msg ); -// MqttProperties mqttProperties = MqttProperties.configBuild ( "39.100.87.192", "mqtt/test" ); -// mqttFactory.createClient ( mqttProperties ); -// -// } - - -} diff --git a/src/main/java/com/load/rebbitmq/RabbitConfig.java b/src/main/java/com/load/rebbitmq/RabbitConfig.java deleted file mode 100644 index 32b5beb..0000000 --- a/src/main/java/com/load/rebbitmq/RabbitConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.load.rebbitmq; - -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Queue; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - - -/** - * rabbit 配置类 - * @author YunFei.Du - * @date 8:39 2024/5/31 - */ -@Configuration -public class RabbitConfig { - @Bean - public Queue initQueue() { - return new Queue ("create.totic", true); - } - - @Bean - public DirectExchange direct(){ - return new DirectExchange ("topic.direct"); - } - - @Bean - public Binding bindingla(DirectExchange direct, Queue initQueue){ - return BindingBuilder.bind(initQueue) - .to(direct) - .with("protocol-parsing"); - } - -} diff --git a/src/main/java/com/load/service/impl/LoadBalanceServiceImpl.java b/src/main/java/com/load/service/impl/LoadBalanceServiceImpl.java index 2e3c999..32c51e1 100644 --- a/src/main/java/com/load/service/impl/LoadBalanceServiceImpl.java +++ b/src/main/java/com/load/service/impl/LoadBalanceServiceImpl.java @@ -33,9 +33,7 @@ public class LoadBalanceServiceImpl implements LoadBalanceService { @Override public Result getConnect(VehicleConnectionReq vehicleConnectionReq) { - - log.error(vehicleConnectionReq.toString()); - + log.error("车辆连接请求:{}",vehicleConnectionReq.toString()); //判断是否有游标key --count @@ -65,8 +63,12 @@ public class LoadBalanceServiceImpl implements LoadBalanceService { } } + /** + * 添加车辆和ip都按redis + * @param vinIp + */ private void insertVinIp(VinIp vinIp) { - + redisTemplate.opsForHash().put("vinIp",vinIp.getVin(),vinIp.getIp()); } @SneakyThrows diff --git a/src/main/java/com/load/util/ECSTool.java b/src/main/java/com/load/util/ECSTool.java index 45233c4..bef68fb 100644 --- a/src/main/java/com/load/util/ECSTool.java +++ b/src/main/java/com/load/util/ECSTool.java @@ -126,7 +126,7 @@ public class ECSTool { } System.out.println ( "------------------------" ); } - log.info ( "ipList: " + ipList ); // [39.100.89.218, 39.100.87.192] + log.info ( "ipList: " + ipList ); // [39.100.89.218, 39.100.113.246] return ipList; } catch (TeaException error) { // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0d1238b..d19593d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,9 +4,6 @@ server: port: 82 spring: - application: -# 协议解析 - name: protocol-parsing rabbitmq: host: 111.229.102.61 port: 5672