fix: 修改权重轮询

master
rouchen 2024-06-01 20:03:54 +08:00
parent 2f90c610b5
commit 79bac537c7
7 changed files with 34 additions and 199 deletions

View File

@ -59,7 +59,7 @@ public class InitConnectWeight implements ApplicationRunner {
//获取所有实例 //获取所有实例
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest()
.setRegionId("cn-shanghai") .setRegionId("cn-shanghai")
.setStatus("Running");; .setStatus("Running");
RuntimeOptions runtime = new RuntimeOptions(); RuntimeOptions runtime = new RuntimeOptions();
try { try {
// 复制代码运行请自行打印 API 的返回值 // 复制代码运行请自行打印 API 的返回值
@ -69,10 +69,13 @@ public class InitConnectWeight implements ApplicationRunner {
List<List<String>> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList()); List<List<String>> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList());
for (List<String> strings : ipListList) { for (List<String> strings : ipListList) {
for (String ip : strings) { for (String ip : strings) {
ipList.add(ip); if (!ip.equals("47.103.75.98")){
ipList.add(ip);
}
} }
System.out.println("------------------------"); System.out.println("------------------------");
} }
log.info("ip ip ip ip i p:{}", ipList);
} catch (TeaException error) { } catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
@ -90,10 +93,17 @@ public class InitConnectWeight implements ApplicationRunner {
System.out.println(error.getData().get("Recommend")); System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message); com.aliyun.teautil.Common.assertAsString(error.message);
} }
//网关收集节点
int gatewayNum = 0;
//数据解析结点数量
int dataNum=0;
//整体负载率
String overallLoad="";
//遍历所有ip,获取每一个服务的连接数 //遍历所有ip,获取每一个服务的连接数
for (String ip : ipList) { for (String ip : ipList) {
//网关收集节点
gatewayNum = ip.length();
log.info("gatewayNum:{}", gatewayNum);
//fluxMq连接 //fluxMq连接
String url = "http://" + ip + ":8080/public/login"; String url = "http://" + ip + ":8080/public/login";
Map<String, Object> request = new HashMap<>(); Map<String, Object> request = new HashMap<>();
@ -104,6 +114,20 @@ public class InitConnectWeight implements ApplicationRunner {
HttpEntity<Map<String, Object>> r = new HttpEntity<Map<String, Object>>(request, httpHeaders); HttpEntity<Map<String, Object>> r = new HttpEntity<Map<String, Object>>(request, httpHeaders);
String result = restTemplate.postForObject(url, r, String.class); String result = restTemplate.postForObject(url, r, String.class);
//整体负载率
int total = 0;
// try {
// JSONObject jsonObject = JSON.parseObject(result);
// String data = jsonObject.getString("data");
// JSONObject jsonObject1 = JSON.parseObject(data);
// String load = jsonObject1.getString("load");
// overallLoad = load;
// log.info("overallLoad:{}", overallLoad);
// //数据解析结点数量
// dataNum = load.length();
// log.info("dataNum:{}", dataNum);
// //获取FluxMq运行时详情信息
// }
//http://fluxmq.muyu.icu/public/cluster //http://fluxmq.muyu.icu/public/cluster
//获取FluxMq运行时详情信息 //获取FluxMq运行时详情信息
@ -113,21 +137,23 @@ public class InitConnectWeight implements ApplicationRunner {
HttpHeaders httpHeadersGetInfo = new HttpHeaders(); HttpHeaders httpHeadersGetInfo = new HttpHeaders();
httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON); httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON);
httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
httpHeadersGetInfo.set("Cookie", result); httpHeadersGetInfo.set("Cookie", result);
HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo); HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo);
ResponseEntity<String> responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1); ResponseEntity<String> responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1);
log.info("响应是:{}", responseInfo.getBody()); log.info("响应是:{}", responseInfo.getBody());
JSONArray jsonArray = JSON.parseArray(responseInfo.getBody()); JSONArray jsonArray = JSON.parseArray(responseInfo.getBody());
if (jsonArray.size() > 0) { if (jsonArray.size() > 0) {
JSONObject jsonObject = jsonArray.getJSONObject(0); JSONObject jsonObject = jsonArray.getJSONObject(0);
// Integer connectSize = jsonObject.getJSONObject("mqttInfo").getJSONObject("runtimes").getInteger("mqtt.connect");
Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize")); Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize"));
connectWeightList.add(new ConnectWeight(ip,100-connectSize)); connectWeightList.add(new ConnectWeight(ip,100-connectSize));
log.info("链接数量:{}", connectSize); log.info("链接数量:{}", connectSize);
} else { } else {
log.info("得到的相应数据为null"); log.info("得到的相应数据为null");
} }
} }
// Integer sum =0; // Integer sum =0;
// for (ConnectWeight connectWeight : connectWeightList) { // for (ConnectWeight connectWeight : connectWeightList) {

View File

@ -1,35 +0,0 @@
package com.car.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
/**
* MessageCallbackService
*
* @author Yangle
* Date 2024/5/29 20:24
*/
@Service
public class MessageCallbackService implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}

View File

@ -1,47 +0,0 @@
package com.car.mqtt;
import lombok.AllArgsConstructor;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
/**
* mqtt MqttConfig
*
* @author Yangle
* Date 2024/5/29 20:26
*/
@Service
@AllArgsConstructor
public class MqttFactory {
private final MessageCallbackService messageCallbackService;
public MqttClient creatClient(MqttProperties mqttProperties) {
MqttClient client = null;
try {
client = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientid(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
// 连接参数
if (mqttProperties.isLong())
{
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
}
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
client.connect(options);
client.setCallback(messageCallbackService);
client.subscribe(mqttProperties.getTopic(),0);
} catch (MqttException e) {
throw new RuntimeException(e);
}
return client;
}
}

View File

@ -1,40 +0,0 @@
package com.car.mqtt;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils;
/**
* MqttProperties
*
* @author Yangle
* Date 2024/5/29 20:06
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class MqttProperties {
private String broker;
private String topic ;
private String username;
private String password;
private String clientid;
public static MqttProperties configBuild(String ip,String topic){
return MqttProperties.builder()
.broker("tcp://"+ip+":1883")
.topic(topic)
.clientid("protocol-parsing")
.build();
}
public boolean isLong(){
return !StringUtils.isBlank(this.username) && !StringUtils.isBlank(this.password);
}
}

View File

@ -1,33 +0,0 @@
package com.car.mqtt;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* MsgHandler
*
* @author Yangle
* Date 2024/5/29 20:44
*/
@Log4j2
@Component
public class MsgHandler {
@Autowired
private MqttFactory mqttFactory;
@RabbitListener(queues = "create.topic")
public void msg(String msg){
System.out.println("接收到消息:" + msg);
MqttProperties mqttProperties = MqttProperties.configBuild(
"47.102.133.88",
"mqtt/test"
);
log.error("接收到消息初始化信息:{}",mqttProperties);
MqttClient mqttClient = mqttFactory.creatClient(mqttProperties);
log.error("client创建成功:{}",mqttClient.getClientId());
}
}

View File

@ -1,37 +0,0 @@
package com.car.mqtt;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitConfig
*
* @author Yangle
* Date 2024/5/29 20:44
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue autoDeleteQueue1() {
return new Queue("create.topic", true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("topic.direct");
}
@Bean
public Binding binding(DirectExchange directExchange,
Queue autoDeleteQueue1 ) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(directExchange)
.with("protocol-parsing");
}
}

View File

@ -42,14 +42,15 @@ public class ConnectServiceImpl implements ConnectService {
} }
} }
public static Client createClient() throws Exception { public static Client createClient() throws Exception {
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
// 建议使用更安全的 STS 方式更多鉴权访问方式请参见https://help.aliyun.com/document_detail/378657.html。 // 建议使用更安全的 STS 方式更多鉴权访问方式请参见https://help.aliyun.com/document_detail/378657.html。
Config config = new Config() Config config = new Config()
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。 // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
.setAccessKeyId("LTAI5tFVx9F12e5f4EuJzyZj") .setAccessKeyId("LTAI5tAEQA9AgnqasQ7Y56cJ")
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
.setAccessKeySecret("mn06SdxTmdmCjmaEGBq95bVF6e3Sa9"); .setAccessKeySecret("IsrnZ6dKBgEit5HXv2xyfo0xT8VGkj");
// Endpoint 请参考 https://api.aliyun.com/product/Ecs // Endpoint 请参考 https://api.aliyun.com/product/Ecs
config.endpoint = "ecs.cn-shanghai.aliyuncs.com"; config.endpoint = "ecs.cn-shanghai.aliyuncs.com";
return new Client(config); return new Client(config);