From 79bac537c76b7bb914c3b92d9e3659ab0e17f6da Mon Sep 17 00:00:00 2001 From: rouchen <3133657697@qq.com> Date: Sat, 1 Jun 2024 20:03:54 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E6=94=B9=E6=9D=83=E9=87=8D?= =?UTF-8?q?=E8=BD=AE=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/car/config/InitConnectWeight.java | 36 ++++++++++++-- .../com/car/mqtt/MessageCallbackService.java | 35 -------------- src/main/java/com/car/mqtt/MqttFactory.java | 47 ------------------- .../java/com/car/mqtt/MqttProperties.java | 40 ---------------- src/main/java/com/car/mqtt/MsgHandler.java | 33 ------------- src/main/java/com/car/mqtt/RabbitConfig.java | 37 --------------- .../car/service/impl/ConnectServiceImpl.java | 5 +- 7 files changed, 34 insertions(+), 199 deletions(-) delete mode 100644 src/main/java/com/car/mqtt/MessageCallbackService.java delete mode 100644 src/main/java/com/car/mqtt/MqttFactory.java delete mode 100644 src/main/java/com/car/mqtt/MqttProperties.java delete mode 100644 src/main/java/com/car/mqtt/MsgHandler.java delete mode 100644 src/main/java/com/car/mqtt/RabbitConfig.java diff --git a/src/main/java/com/car/config/InitConnectWeight.java b/src/main/java/com/car/config/InitConnectWeight.java index bcdc01b..470f6d4 100644 --- a/src/main/java/com/car/config/InitConnectWeight.java +++ b/src/main/java/com/car/config/InitConnectWeight.java @@ -59,7 +59,7 @@ public class InitConnectWeight implements ApplicationRunner { //获取所有实例 DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() .setRegionId("cn-shanghai") - .setStatus("Running");; + .setStatus("Running"); RuntimeOptions runtime = new RuntimeOptions(); try { // 复制代码运行请自行打印 API 的返回值 @@ -69,10 +69,13 @@ public class InitConnectWeight implements ApplicationRunner { List> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList()); for (List strings : ipListList) { for (String ip : strings) { - ipList.add(ip); + if (!ip.equals("47.103.75.98")){ + ipList.add(ip); + } } System.out.println("------------------------"); } + log.info("ip ip ip ip i p:{}", ipList); } catch (TeaException error) { // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 @@ -90,10 +93,17 @@ public class InitConnectWeight implements ApplicationRunner { System.out.println(error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } - - + //网关收集节点 + int gatewayNum = 0; + //数据解析结点数量 + int dataNum=0; + //整体负载率 + String overallLoad=""; //遍历所有ip,获取每一个服务的连接数 for (String ip : ipList) { + //网关收集节点 + gatewayNum = ip.length(); + log.info("gatewayNum:{}", gatewayNum); //fluxMq连接 String url = "http://" + ip + ":8080/public/login"; Map request = new HashMap<>(); @@ -104,6 +114,20 @@ public class InitConnectWeight implements ApplicationRunner { HttpEntity> r = new HttpEntity>(request, httpHeaders); String result = restTemplate.postForObject(url, r, String.class); + //整体负载率 + int total = 0; +// try { +// JSONObject jsonObject = JSON.parseObject(result); +// String data = jsonObject.getString("data"); +// JSONObject jsonObject1 = JSON.parseObject(data); +// String load = jsonObject1.getString("load"); +// overallLoad = load; +// log.info("overallLoad:{}", overallLoad); +// //数据解析结点数量 +// dataNum = load.length(); +// log.info("dataNum:{}", dataNum); +// //获取FluxMq运行时详情信息 +// } //http://fluxmq.muyu.icu/public/cluster //获取FluxMq运行时详情信息 @@ -113,21 +137,23 @@ public class InitConnectWeight implements ApplicationRunner { HttpHeaders httpHeadersGetInfo = new HttpHeaders(); httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON); httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); + httpHeadersGetInfo.set("Cookie", result); HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo); ResponseEntity responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1); - log.info("响应是:{}", responseInfo.getBody()); JSONArray jsonArray = JSON.parseArray(responseInfo.getBody()); if (jsonArray.size() > 0) { JSONObject jsonObject = jsonArray.getJSONObject(0); +// Integer connectSize = jsonObject.getJSONObject("mqttInfo").getJSONObject("runtimes").getInteger("mqtt.connect"); Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize")); connectWeightList.add(new ConnectWeight(ip,100-connectSize)); log.info("链接数量:{}", connectSize); } else { log.info("得到的相应数据为null"); } + } // Integer sum =0; // for (ConnectWeight connectWeight : connectWeightList) { diff --git a/src/main/java/com/car/mqtt/MessageCallbackService.java b/src/main/java/com/car/mqtt/MessageCallbackService.java deleted file mode 100644 index ec9ce52..0000000 --- a/src/main/java/com/car/mqtt/MessageCallbackService.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.car.mqtt; - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.stereotype.Service; - -/** - * 回执消息类 MessageCallbackService - * - * @author Yangle - * Date 2024/5/29 20:24 - */ -@Service -public class MessageCallbackService implements MqttCallback { - - @Override - public void connectionLost(Throwable cause) { - System.out.println("connectionLost: " + cause.getMessage()); - } - @Override - public void messageArrived(String topic, MqttMessage message) { - System.out.println("topic: " + topic); - System.out.println("Qos: " + message.getQos()); - System.out.println("message content: " + new String(message.getPayload())); - - } - - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("deliveryComplete---------" + token.isComplete()); - } - -} diff --git a/src/main/java/com/car/mqtt/MqttFactory.java b/src/main/java/com/car/mqtt/MqttFactory.java deleted file mode 100644 index ed41ec0..0000000 --- a/src/main/java/com/car/mqtt/MqttFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.car.mqtt; - -import lombok.AllArgsConstructor; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.stereotype.Service; - -/** - * mqtt配置 MqttConfig - * - * @author Yangle - * Date 2024/5/29 20:26 - */ -@Service -@AllArgsConstructor -public class MqttFactory { - - private final MessageCallbackService messageCallbackService; - public MqttClient creatClient(MqttProperties mqttProperties) { - MqttClient client = null; - try { - client = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientid(), new MemoryPersistence()); - MqttConnectOptions options = new MqttConnectOptions(); - - // 连接参数 - if (mqttProperties.isLong()) - { - options.setUserName(mqttProperties.getUsername()); - options.setPassword(mqttProperties.getPassword().toCharArray()); - } - - - options.setConnectionTimeout(60); - options.setKeepAliveInterval(60); - client.connect(options); - client.setCallback(messageCallbackService); - client.subscribe(mqttProperties.getTopic(),0); - - } catch (MqttException e) { - throw new RuntimeException(e); - } - - return client; - } -} diff --git a/src/main/java/com/car/mqtt/MqttProperties.java b/src/main/java/com/car/mqtt/MqttProperties.java deleted file mode 100644 index 9e8cc35..0000000 --- a/src/main/java/com/car/mqtt/MqttProperties.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.car.mqtt; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; -import org.apache.commons.lang3.StringUtils; - -/** - * 配置文件 MqttProperties - * - * @author Yangle - * Date 2024/5/29 20:06 - */ -@Data -@AllArgsConstructor -@NoArgsConstructor -@SuperBuilder -public class MqttProperties { - - private String broker; - private String topic ; - private String username; - private String password; - private String clientid; - - public static MqttProperties configBuild(String ip,String topic){ - return MqttProperties.builder() - .broker("tcp://"+ip+":1883") - .topic(topic) - .clientid("protocol-parsing") - .build(); - } - - public boolean isLong(){ - return !StringUtils.isBlank(this.username) && !StringUtils.isBlank(this.password); - } - - -} diff --git a/src/main/java/com/car/mqtt/MsgHandler.java b/src/main/java/com/car/mqtt/MsgHandler.java deleted file mode 100644 index 10ec7fc..0000000 --- a/src/main/java/com/car/mqtt/MsgHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.car.mqtt; - -import lombok.extern.log4j.Log4j2; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * 消息处理器 MsgHandler - * - * @author Yangle - * Date 2024/5/29 20:44 - */ -@Log4j2 -@Component -public class MsgHandler { - - @Autowired - private MqttFactory mqttFactory; - @RabbitListener(queues = "create.topic") - public void msg(String msg){ - System.out.println("接收到消息:" + msg); - MqttProperties mqttProperties = MqttProperties.configBuild( - "47.102.133.88", - "mqtt/test" - ); - log.error("接收到消息初始化信息:{}",mqttProperties); - MqttClient mqttClient = mqttFactory.creatClient(mqttProperties); - log.error("client创建成功:{}",mqttClient.getClientId()); - - } -} diff --git a/src/main/java/com/car/mqtt/RabbitConfig.java b/src/main/java/com/car/mqtt/RabbitConfig.java deleted file mode 100644 index 780ddb2..0000000 --- a/src/main/java/com/car/mqtt/RabbitConfig.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.car.mqtt; - -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Queue; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * RabbitConfig - * - * @author Yangle - * Date 2024/5/29 20:44 - */ -@Configuration -public class RabbitConfig { - - - @Bean - public Queue autoDeleteQueue1() { - return new Queue("create.topic", true); - } - - @Bean - public DirectExchange directExchange() { - return new DirectExchange("topic.direct"); - } - - @Bean - public Binding binding(DirectExchange directExchange, - Queue autoDeleteQueue1 ) { - return BindingBuilder.bind(autoDeleteQueue1) - .to(directExchange) - .with("protocol-parsing"); - } -} \ No newline at end of file diff --git a/src/main/java/com/car/service/impl/ConnectServiceImpl.java b/src/main/java/com/car/service/impl/ConnectServiceImpl.java index 31eabab..6a2c446 100644 --- a/src/main/java/com/car/service/impl/ConnectServiceImpl.java +++ b/src/main/java/com/car/service/impl/ConnectServiceImpl.java @@ -42,14 +42,15 @@ public class ConnectServiceImpl implements ConnectService { } } + public static Client createClient() throws Exception { // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 // 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378657.html。 Config config = new Config() // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。 - .setAccessKeyId("LTAI5tFVx9F12e5f4EuJzyZj") + .setAccessKeyId("LTAI5tAEQA9AgnqasQ7Y56cJ") // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 - .setAccessKeySecret("mn06SdxTmdmCjmaEGBq95bVF6e3Sa9"); + .setAccessKeySecret("IsrnZ6dKBgEit5HXv2xyfo0xT8VGkj"); // Endpoint 请参考 https://api.aliyun.com/product/Ecs config.endpoint = "ecs.cn-shanghai.aliyuncs.com"; return new Client(config);