From 0ccbcc4335b1e1fbf11dba0207d5cc4a3d7ced7a Mon Sep 17 00:00:00 2001 From: Saisai Liu <1374434128@qq.com> Date: Tue, 4 Jun 2024 22:24:51 +0800 Subject: [PATCH] feat():before testRunner --- .../com/mobai/openApi/SelectInstances.java | 7 +- .../java/com/mobai/runner/MqttRunner.java | 118 ++++++++++-------- .../java/com/mobai/util/RedisService.java | 4 + 3 files changed, 71 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/mobai/openApi/SelectInstances.java b/src/main/java/com/mobai/openApi/SelectInstances.java index a9c61c8..9ace7ec 100644 --- a/src/main/java/com/mobai/openApi/SelectInstances.java +++ b/src/main/java/com/mobai/openApi/SelectInstances.java @@ -62,7 +62,6 @@ public class SelectInstances { // .setImageId("m-8vb8qnidv34yj3nbirhc") .setRegionId("cn-zhangjiakou"); com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions(); - describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime); } catch (Exception e) { throw new RuntimeException(e); @@ -70,7 +69,7 @@ public class SelectInstances { return describeInstancesResponse; } - //10分钟 + //1分钟 @Scheduled(cron = "0 0/1 * * * ? ") //10秒 // @Scheduled(cron = "0/10 * * * * ? ") @@ -116,7 +115,7 @@ public class SelectInstances { List nodes = new ArrayList<>(); for (String ip : ips) { Result info = fluxGetInfoService.getInfo(ip); - //连接总数 + // 获取连接总数 String string = redisTemplate.opsForValue().get("onlineCar-" + ip); long connectSize = Long.parseLong(string == null ? "0" : string); log.info("{}::{}", ip, connectSize); @@ -189,7 +188,7 @@ public class SelectInstances { // 可负载IP轮询排列 log.info("排列ip,{}", ips); Boolean mqttIp = redisTemplate.delete("mqttIp"); - ips.forEach(mqtt->redisTemplate.opsForList().leftPush("mqttIp", JSON.toJSONString(mqtt))); + ips.forEach(mqtt -> redisTemplate.opsForList().leftPush("mqttIp", JSON.toJSONString(mqtt))); } diff --git a/src/main/java/com/mobai/runner/MqttRunner.java b/src/main/java/com/mobai/runner/MqttRunner.java index 9d207ab..e8057fd 100644 --- a/src/main/java/com/mobai/runner/MqttRunner.java +++ b/src/main/java/com/mobai/runner/MqttRunner.java @@ -1,55 +1,65 @@ -//package com.mobai.runner; -// -//import com.aliyun.ecs20140526.models.DescribeInstancesResponse; -//import com.mobai.mq.rabbitmq.cofig.MqttFactory; -//import com.mobai.mq.rabbitmq.cofig.MqttProperties; -//import com.mobai.mq.rabbitmq.domian.GetOptions; -//import com.mobai.mq.rabbitmq.domian.MqttCallBackServiceImpl; -//import com.mobai.openApi.SelectInstances; -//import org.eclipse.paho.client.mqttv3.MqttClient; -//import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.boot.ApplicationArguments; -//import org.springframework.boot.ApplicationRunner; -//import org.springframework.data.redis.connection.RedisServer; -//import org.springframework.stereotype.Component; -// -///** -// * @ClassName MqttRunner -// * @Description 描述 -// * @Author Mobai -// * @Date 2024/6/4 20:03 -// */ -//@Component -//public class MqttRunner implements ApplicationRunner { -// -// @Autowired -// private SelectInstances selectInstances; +package com.mobai.runner; -// @Override -// public void run(ApplicationArguments args) throws Exception { -// String string = redisServer.get("fluxMq"); -// -// MqttProperties mqttProperties = MqttProperties.configBuild("39.98.69.92", "topic0"); -//// MqttProperties mqttProperties = new MqttProperties(); -//// mqttProperties.setBroker("tcp://39.98.69.92:1883"); -//// mqttProperties.setTopic("mqtt/test"); -// mqttProperties.setUsername("emqx"); -// mqttProperties.setPassword("public"); -//// mqttProperties.setClientid("subscribe_client"); -// -// int qos = 0; -// try { -// MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties); -// // 连接参数 -// MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties); -// // 设置回调 -// client.setCallback(new MqttCallBackServiceImpl()); -// // 进行连接 -// client.connect(options); -// client.subscribe(mqttProperties.getTopic(), qos); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -//} +import com.alibaba.fastjson.JSON; +import com.aliyun.ecs20140526.models.DescribeInstancesResponse; +import com.mobai.domain.MqttServerModel; +import com.mobai.mq.rabbitmq.cofig.MqttFactory; +import com.mobai.mq.rabbitmq.cofig.MqttProperties; +import com.mobai.mq.rabbitmq.domian.GetOptions; +import com.mobai.mq.rabbitmq.domian.MqttCallBackServiceImpl; +import com.mobai.openApi.SelectInstances; +import com.mobai.util.RedisService; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.connection.RedisServer; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @ClassName MqttRunner + * @Description 描述 + * @Author Mobai + * @Date 2024/6/4 20:03 + */ +@Component +public class MqttRunner implements ApplicationRunner { + + @Autowired + private SelectInstances selectInstances; + + @Autowired + private RedisService redisService; + + @Override + public void run(ApplicationArguments args) throws Exception { + List ips = redisService.getList("mqttIp"); + List list = ips.stream().distinct().map(str -> JSON.parseObject(str, MqttServerModel.class)).toList(); + list.forEach(mqttServerModel -> { + + }); + MqttProperties mqttProperties = MqttProperties.configBuild("39.98.69.92", "topic0"); +// MqttProperties mqttProperties = new MqttProperties(); +// mqttProperties.setBroker("tcp://39.98.69.92:1883"); +// mqttProperties.setTopic("mqtt/test"); + mqttProperties.setUsername("emqx"); + mqttProperties.setPassword("public"); +// mqttProperties.setClientid("subscribe_client"); + int qos = 0; + try { + MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties); + // 连接参数 + MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties); + // 设置回调 + client.setCallback(new MqttCallBackServiceImpl()); + // 进行连接 + client.connect(options); + client.subscribe(mqttProperties.getTopic(), qos); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/mobai/util/RedisService.java b/src/main/java/com/mobai/util/RedisService.java index 9a88d71..b8cf38c 100644 --- a/src/main/java/com/mobai/util/RedisService.java +++ b/src/main/java/com/mobai/util/RedisService.java @@ -297,4 +297,8 @@ public class RedisService { public String getValue(String vin) { return stringRedisTemplate.opsForValue().get(vin); } + + public List getList(String key) { + return redisTemplate.opsForList().range(key,0,-1); + } }