From a99369e58b0768e3ac2989c1024909289c351d6f Mon Sep 17 00:00:00 2001 From: Saisai Liu <1374434128@qq.com> Date: Fri, 7 Jun 2024 22:37:37 +0800 Subject: [PATCH] feat():before testRunner --- .idea/inspectionProfiles/Project_Default.xml | 3 + .../controller/FluxGetInfoController.java | 9 ++- .../mq/rabbitmq/rabbitMq/MessageHandler.java | 59 +++++++++++++------ .../com/mobai/openApi/SelectInstances.java | 44 +++++++------- .../java/com/mobai/runner/MqttRunner.java | 51 ++++++++-------- .../com/mobai/service/FluxGetInfoService.java | 4 ++ .../service/impl/FluxGetInfoServiceImpl.java | 22 +++++++ 7 files changed, 128 insertions(+), 64 deletions(-) diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index ee2c34b..e2809fa 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -9,6 +9,9 @@ + + \ No newline at end of file diff --git a/src/main/java/com/mobai/controller/FluxGetInfoController.java b/src/main/java/com/mobai/controller/FluxGetInfoController.java index 3162a9e..975568c 100644 --- a/src/main/java/com/mobai/controller/FluxGetInfoController.java +++ b/src/main/java/com/mobai/controller/FluxGetInfoController.java @@ -8,6 +8,8 @@ import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import java.util.List; + /** * @ClassName FluxGetInfo * @Description 描述 @@ -21,11 +23,16 @@ public class FluxGetInfoController { @Autowired private FluxGetInfoService fluxGetInfoService; - @GetMapping("getInfo") + @GetMapping("/getInfo/") public Result getInfo(){ return fluxGetInfoService.getInfo(null); } + @GetMapping("getIps/") + public Result> getIps(){ + return fluxGetInfoService.getIps(); + } + @PostMapping("/getIp") public Result vehicleConnection(@RequestBody(required = false) VehicleConnectionReq req){ return fluxGetInfoService.vehicleConnection(req); diff --git a/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java b/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java index 3aa6c4e..d6c591b 100644 --- a/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java +++ b/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java @@ -43,17 +43,17 @@ public class MessageHandler { @Autowired private RedisService redisService; - @RabbitListener(queues = {"create.topic"}) - private void message(String msg) { - log.info("消息内容:{}", msg); - MqttProperties topic0 = MqttProperties.configBuild( - "39.98.69.92", - "topic0"); - log.info("接收到消息:{}", topic0); - MqttClient client = mqttFactory.buildOptions(topic0); - log.info("client创建:{}", client); - log.info("clientID创建:{}", client.getClientId()); - } +// @RabbitListener(queues = {"create.topic"}) +// private void message(String msg) { +// log.info("消息内容:{}", msg); +//// MqttProperties topic0 = MqttProperties.configBuild( +//// "39.98.69.92", +//// "topic0"); +//// log.info("接收到消息:{}", topic0); +//// MqttClient client = mqttFactory.buildOptions(topic0); +//// log.info("client创建:{}", client); +//// log.info("clientID创建:{}", client.getClientId()); +// } /** @@ -66,13 +66,37 @@ public class MessageHandler { log.info("event:{}", msg); // 事件内容 JSONObject jsonObject = JSON.parseObject(msg); - String vin = JSON.to(String.class, jsonObject.get("clientId")); + String clientId = JSON.to(String.class, jsonObject.get("clientId")); + if (!clientId.contains("VIN")) { + log.error("不是车辆事件::{}",message); + try { + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + return; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (!clientId.contains("-")){ + try { + throw new ServletException("不是vin——ip"); + } catch (ServletException e) { + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + throw new RuntimeException(e); + } + } + String[] split = clientId.split("-"); + String vin = split[0]; + String ip = split[1]; long timestamp = JSON.to(Long.class, jsonObject.get("timestamp")); + // 判断登陆事件 if (jsonObject.get("auth") != null) { try { log.info("上线事件"); - String ip = redisService.getValue(vin); - log.info("上线车辆vin:{}\n\t上线时ip:{}", vin, ip); + log.info("上线车辆vin:{}--上线时ip:{}", vin, ip); if (ip == null) { throw new ServletException("上线时ip为空"); } @@ -97,9 +121,8 @@ public class MessageHandler { } } else { log.info("下线事件"); - String ip = redisService.getValue(vin); try { - log.info("下线车辆vin:{}\n\t下线时ip:{}", vin, ip); + log.info("下线车辆vin:{}--下线时ip:{}", vin, ip); if (ip == null) { throw new ServletException("下线时ip为空"); } @@ -114,7 +137,7 @@ public class MessageHandler { if (vinStayTime==null){ throw new ServletException("{"+vin+"}数据不存在"); } - String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime())); + String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime()-8*60*60*1000)); vinStayTime.setStayLongTime(format); vinStayTime.setDownTime(timestamp); // 修改下线时间 @@ -125,7 +148,7 @@ public class MessageHandler { // 输出在线时长 log.info("车辆在线时长为:{}", format); log.info(update ? vin + "下线记录成功" : vin + "下线记录失败"); - + redisService.deleteObject(vin); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (ServletException | IOException e) { log.error("下线失败"); diff --git a/src/main/java/com/mobai/openApi/SelectInstances.java b/src/main/java/com/mobai/openApi/SelectInstances.java index 9ace7ec..46e47e1 100644 --- a/src/main/java/com/mobai/openApi/SelectInstances.java +++ b/src/main/java/com/mobai/openApi/SelectInstances.java @@ -26,8 +26,6 @@ import java.util.*; @Component public class SelectInstances { - @Autowired - private FluxGetInfoService fluxGetInfoService; @Autowired private RedisTemplate redisTemplate; @@ -69,28 +67,34 @@ public class SelectInstances { return describeInstancesResponse; } + public List ips( DescribeInstancesResponse response){ + List ips = new ArrayList<>(); + DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances(); + List instance = + instances.getInstance(); + for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance inst : instance) { + List ipAddress = inst + .getPublicIpAddress() + .getIpAddress(); + if (ipAddress.isEmpty()) { + continue; + } else { + ipAddress.forEach(ip -> ips.add(ip)); + } + } + log.info("当前实例ip为{}", ips); + return ips; + } + //1分钟 - @Scheduled(cron = "0 0/1 * * * ? ") + @Scheduled(cron = "* 0/5 * * * ? ") //10秒 // @Scheduled(cron = "0/10 * * * * ? ") - public void saveIps() throws Exception { - List ips = new ArrayList<>(); + public void saveIps() { + List ips = null; DescribeInstancesResponse response = this.getInfo(); try { - DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances(); - List instance = - instances.getInstance(); - for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance inst : instance) { - List ipAddress = inst - .getPublicIpAddress() - .getIpAddress(); - if (ipAddress.isEmpty()) { - continue; - } else { - ipAddress.forEach(ip -> ips.add(ip)); - } - } - log.info("当前实例ip为{}", ips); + ips = this.ips(response); } catch (TeaException error) { // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 // 错误 message @@ -114,7 +118,6 @@ public class SelectInstances { } List nodes = new ArrayList<>(); for (String ip : ips) { - Result info = fluxGetInfoService.getInfo(ip); // 获取连接总数 String string = redisTemplate.opsForValue().get("onlineCar-" + ip); long connectSize = Long.parseLong(string == null ? "0" : string); @@ -184,7 +187,6 @@ public class SelectInstances { break; } } - redisTemplate.opsForList().leftPush("fluxMq", JSON.toJSONString(ips)); // 可负载IP轮询排列 log.info("排列ip,{}", ips); Boolean mqttIp = redisTemplate.delete("mqttIp"); diff --git a/src/main/java/com/mobai/runner/MqttRunner.java b/src/main/java/com/mobai/runner/MqttRunner.java index e8057fd..e71b918 100644 --- a/src/main/java/com/mobai/runner/MqttRunner.java +++ b/src/main/java/com/mobai/runner/MqttRunner.java @@ -36,30 +36,33 @@ public class MqttRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { - List ips = redisService.getList("mqttIp"); - List list = ips.stream().distinct().map(str -> JSON.parseObject(str, MqttServerModel.class)).toList(); - list.forEach(mqttServerModel -> { + // 存入mqttIp + selectInstances.saveIps(); + +// List ips = redisService.getList("mqttIp"); +// List list = ips.stream().distinct().map(str -> JSON.parseObject(str, MqttServerModel.class)).toList(); +// list.forEach(mqttServerModel -> { +// MqttProperties mqttProperties = MqttProperties.configBuild(mqttServerModel.getBroker(), mqttServerModel.getTopic()); +//// MqttProperties mqttProperties = new MqttProperties(); +//// mqttProperties.setBroker("tcp://39.98.69.92:1883"); +//// mqttProperties.setTopic("mqtt/test"); +// mqttProperties.setUsername("emqx"); +// mqttProperties.setPassword("public"); +//// mqttProperties.setClientid("subscribe_client"); +// int qos = 0; +// try { +// MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties); +// // 连接参数 +// MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties); +// // 设置回调 +// client.setCallback(new MqttCallBackServiceImpl()); +// // 进行连接 +// client.connect(options); +// client.subscribe(mqttProperties.getTopic(), qos); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); - }); - MqttProperties mqttProperties = MqttProperties.configBuild("39.98.69.92", "topic0"); -// MqttProperties mqttProperties = new MqttProperties(); -// mqttProperties.setBroker("tcp://39.98.69.92:1883"); -// mqttProperties.setTopic("mqtt/test"); - mqttProperties.setUsername("emqx"); - mqttProperties.setPassword("public"); -// mqttProperties.setClientid("subscribe_client"); - int qos = 0; - try { - MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties); - // 连接参数 - MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties); - // 设置回调 - client.setCallback(new MqttCallBackServiceImpl()); - // 进行连接 - client.connect(options); - client.subscribe(mqttProperties.getTopic(), qos); - } catch (Exception e) { - e.printStackTrace(); - } } } diff --git a/src/main/java/com/mobai/service/FluxGetInfoService.java b/src/main/java/com/mobai/service/FluxGetInfoService.java index 0a42532..21d50a3 100644 --- a/src/main/java/com/mobai/service/FluxGetInfoService.java +++ b/src/main/java/com/mobai/service/FluxGetInfoService.java @@ -1,8 +1,11 @@ package com.mobai.service; +import com.mobai.domain.MqttServerModel; import com.mobai.domain.Result; import com.mobai.domain.VehicleConnectionReq; +import java.util.List; + /** * @ClassName FluxGetInfoService * @Description 描述 @@ -17,4 +20,5 @@ public interface FluxGetInfoService { Result vehicleConnection(VehicleConnectionReq req); + Result> getIps(); } diff --git a/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java b/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java index 6040cb9..1efa28b 100644 --- a/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java +++ b/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java @@ -1,8 +1,11 @@ package com.mobai.service.impl; import com.alibaba.fastjson2.JSON; +import com.aliyun.ecs20140526.models.DescribeInstancesResponse; +import com.aliyun.ecs20140526.models.DescribeInstancesResponseBody; import com.mobai.domain.*; import com.mobai.domain.flux.ApifoxModel; +import com.mobai.openApi.SelectInstances; import com.mobai.service.FluxGetInfoService; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; @@ -15,7 +18,9 @@ import org.springframework.stereotype.Service; import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestTemplate; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * @ClassName FluxGetInfoServiceImpl @@ -33,6 +38,8 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService { @Autowired private RedisTemplate redis; + @Autowired + private SelectInstances selectInstances; /** * 通过ip获取详细信息 * @@ -96,6 +103,21 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService { log.info("已获取到对象:{}",mqttServerModel); return Result.success(mqttServerModel); } + + @Override + public Result> getIps() { + List ips = new ArrayList<>(); + DescribeInstancesResponse response = selectInstances.getInfo(); + ips = selectInstances.ips(response); + log.info("当前实例ip为{}", ips); + List finalIps = ips; + List collect = ips.stream().map(ip -> + new MqttServerModel() {{ + setBroker(ip); + setTopic("topic" + finalIps.indexOf(ip)); + }}).collect(Collectors.toList()); + return Result.success(collect); + } } // 达到60%开启新服务,30%关闭低实例