diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index ee2c34b..e2809fa 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -9,6 +9,9 @@
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/mobai/controller/FluxGetInfoController.java b/src/main/java/com/mobai/controller/FluxGetInfoController.java
index 3162a9e..975568c 100644
--- a/src/main/java/com/mobai/controller/FluxGetInfoController.java
+++ b/src/main/java/com/mobai/controller/FluxGetInfoController.java
@@ -8,6 +8,8 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
+import java.util.List;
+
/**
* @ClassName FluxGetInfo
* @Description 描述
@@ -21,11 +23,16 @@ public class FluxGetInfoController {
@Autowired
private FluxGetInfoService fluxGetInfoService;
- @GetMapping("getInfo")
+ @GetMapping("/getInfo/")
public Result getInfo(){
return fluxGetInfoService.getInfo(null);
}
+ @GetMapping("getIps/")
+ public Result> getIps(){
+ return fluxGetInfoService.getIps();
+ }
+
@PostMapping("/getIp")
public Result vehicleConnection(@RequestBody(required = false) VehicleConnectionReq req){
return fluxGetInfoService.vehicleConnection(req);
diff --git a/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java b/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java
index 3aa6c4e..d6c591b 100644
--- a/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java
+++ b/src/main/java/com/mobai/mq/rabbitmq/rabbitMq/MessageHandler.java
@@ -43,17 +43,17 @@ public class MessageHandler {
@Autowired
private RedisService redisService;
- @RabbitListener(queues = {"create.topic"})
- private void message(String msg) {
- log.info("消息内容:{}", msg);
- MqttProperties topic0 = MqttProperties.configBuild(
- "39.98.69.92",
- "topic0");
- log.info("接收到消息:{}", topic0);
- MqttClient client = mqttFactory.buildOptions(topic0);
- log.info("client创建:{}", client);
- log.info("clientID创建:{}", client.getClientId());
- }
+// @RabbitListener(queues = {"create.topic"})
+// private void message(String msg) {
+// log.info("消息内容:{}", msg);
+//// MqttProperties topic0 = MqttProperties.configBuild(
+//// "39.98.69.92",
+//// "topic0");
+//// log.info("接收到消息:{}", topic0);
+//// MqttClient client = mqttFactory.buildOptions(topic0);
+//// log.info("client创建:{}", client);
+//// log.info("clientID创建:{}", client.getClientId());
+// }
/**
@@ -66,13 +66,37 @@ public class MessageHandler {
log.info("event:{}", msg);
// 事件内容
JSONObject jsonObject = JSON.parseObject(msg);
- String vin = JSON.to(String.class, jsonObject.get("clientId"));
+ String clientId = JSON.to(String.class, jsonObject.get("clientId"));
+ if (!clientId.contains("VIN")) {
+ log.error("不是车辆事件::{}",message);
+ try {
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
+ return;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (!clientId.contains("-")){
+ try {
+ throw new ServletException("不是vin——ip");
+ } catch (ServletException e) {
+ try {
+ channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+ String[] split = clientId.split("-");
+ String vin = split[0];
+ String ip = split[1];
long timestamp = JSON.to(Long.class, jsonObject.get("timestamp"));
+ // 判断登陆事件
if (jsonObject.get("auth") != null) {
try {
log.info("上线事件");
- String ip = redisService.getValue(vin);
- log.info("上线车辆vin:{}\n\t上线时ip:{}", vin, ip);
+ log.info("上线车辆vin:{}--上线时ip:{}", vin, ip);
if (ip == null) {
throw new ServletException("上线时ip为空");
}
@@ -97,9 +121,8 @@ public class MessageHandler {
}
} else {
log.info("下线事件");
- String ip = redisService.getValue(vin);
try {
- log.info("下线车辆vin:{}\n\t下线时ip:{}", vin, ip);
+ log.info("下线车辆vin:{}--下线时ip:{}", vin, ip);
if (ip == null) {
throw new ServletException("下线时ip为空");
}
@@ -114,7 +137,7 @@ public class MessageHandler {
if (vinStayTime==null){
throw new ServletException("{"+vin+"}数据不存在");
}
- String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime()));
+ String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime()-8*60*60*1000));
vinStayTime.setStayLongTime(format);
vinStayTime.setDownTime(timestamp);
// 修改下线时间
@@ -125,7 +148,7 @@ public class MessageHandler {
// 输出在线时长
log.info("车辆在线时长为:{}", format);
log.info(update ? vin + "下线记录成功" : vin + "下线记录失败");
-
+ redisService.deleteObject(vin);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ServletException | IOException e) {
log.error("下线失败");
diff --git a/src/main/java/com/mobai/openApi/SelectInstances.java b/src/main/java/com/mobai/openApi/SelectInstances.java
index 9ace7ec..46e47e1 100644
--- a/src/main/java/com/mobai/openApi/SelectInstances.java
+++ b/src/main/java/com/mobai/openApi/SelectInstances.java
@@ -26,8 +26,6 @@ import java.util.*;
@Component
public class SelectInstances {
- @Autowired
- private FluxGetInfoService fluxGetInfoService;
@Autowired
private RedisTemplate redisTemplate;
@@ -69,28 +67,34 @@ public class SelectInstances {
return describeInstancesResponse;
}
+ public List ips( DescribeInstancesResponse response){
+ List ips = new ArrayList<>();
+ DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances();
+ List instance =
+ instances.getInstance();
+ for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance inst : instance) {
+ List ipAddress = inst
+ .getPublicIpAddress()
+ .getIpAddress();
+ if (ipAddress.isEmpty()) {
+ continue;
+ } else {
+ ipAddress.forEach(ip -> ips.add(ip));
+ }
+ }
+ log.info("当前实例ip为{}", ips);
+ return ips;
+ }
+
//1分钟
- @Scheduled(cron = "0 0/1 * * * ? ")
+ @Scheduled(cron = "* 0/5 * * * ? ")
//10秒
// @Scheduled(cron = "0/10 * * * * ? ")
- public void saveIps() throws Exception {
- List ips = new ArrayList<>();
+ public void saveIps() {
+ List ips = null;
DescribeInstancesResponse response = this.getInfo();
try {
- DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances();
- List instance =
- instances.getInstance();
- for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance inst : instance) {
- List ipAddress = inst
- .getPublicIpAddress()
- .getIpAddress();
- if (ipAddress.isEmpty()) {
- continue;
- } else {
- ipAddress.forEach(ip -> ips.add(ip));
- }
- }
- log.info("当前实例ip为{}", ips);
+ ips = this.ips(response);
} catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message
@@ -114,7 +118,6 @@ public class SelectInstances {
}
List nodes = new ArrayList<>();
for (String ip : ips) {
- Result info = fluxGetInfoService.getInfo(ip);
// 获取连接总数
String string = redisTemplate.opsForValue().get("onlineCar-" + ip);
long connectSize = Long.parseLong(string == null ? "0" : string);
@@ -184,7 +187,6 @@ public class SelectInstances {
break;
}
}
- redisTemplate.opsForList().leftPush("fluxMq", JSON.toJSONString(ips));
// 可负载IP轮询排列
log.info("排列ip,{}", ips);
Boolean mqttIp = redisTemplate.delete("mqttIp");
diff --git a/src/main/java/com/mobai/runner/MqttRunner.java b/src/main/java/com/mobai/runner/MqttRunner.java
index e8057fd..e71b918 100644
--- a/src/main/java/com/mobai/runner/MqttRunner.java
+++ b/src/main/java/com/mobai/runner/MqttRunner.java
@@ -36,30 +36,33 @@ public class MqttRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
- List ips = redisService.getList("mqttIp");
- List list = ips.stream().distinct().map(str -> JSON.parseObject(str, MqttServerModel.class)).toList();
- list.forEach(mqttServerModel -> {
+ // 存入mqttIp
+ selectInstances.saveIps();
+
+// List ips = redisService.getList("mqttIp");
+// List list = ips.stream().distinct().map(str -> JSON.parseObject(str, MqttServerModel.class)).toList();
+// list.forEach(mqttServerModel -> {
+// MqttProperties mqttProperties = MqttProperties.configBuild(mqttServerModel.getBroker(), mqttServerModel.getTopic());
+//// MqttProperties mqttProperties = new MqttProperties();
+//// mqttProperties.setBroker("tcp://39.98.69.92:1883");
+//// mqttProperties.setTopic("mqtt/test");
+// mqttProperties.setUsername("emqx");
+// mqttProperties.setPassword("public");
+//// mqttProperties.setClientid("subscribe_client");
+// int qos = 0;
+// try {
+// MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties);
+// // 连接参数
+// MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties);
+// // 设置回调
+// client.setCallback(new MqttCallBackServiceImpl());
+// // 进行连接
+// client.connect(options);
+// client.subscribe(mqttProperties.getTopic(), qos);
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// });
- });
- MqttProperties mqttProperties = MqttProperties.configBuild("39.98.69.92", "topic0");
-// MqttProperties mqttProperties = new MqttProperties();
-// mqttProperties.setBroker("tcp://39.98.69.92:1883");
-// mqttProperties.setTopic("mqtt/test");
- mqttProperties.setUsername("emqx");
- mqttProperties.setPassword("public");
-// mqttProperties.setClientid("subscribe_client");
- int qos = 0;
- try {
- MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties);
- // 连接参数
- MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties);
- // 设置回调
- client.setCallback(new MqttCallBackServiceImpl());
- // 进行连接
- client.connect(options);
- client.subscribe(mqttProperties.getTopic(), qos);
- } catch (Exception e) {
- e.printStackTrace();
- }
}
}
diff --git a/src/main/java/com/mobai/service/FluxGetInfoService.java b/src/main/java/com/mobai/service/FluxGetInfoService.java
index 0a42532..21d50a3 100644
--- a/src/main/java/com/mobai/service/FluxGetInfoService.java
+++ b/src/main/java/com/mobai/service/FluxGetInfoService.java
@@ -1,8 +1,11 @@
package com.mobai.service;
+import com.mobai.domain.MqttServerModel;
import com.mobai.domain.Result;
import com.mobai.domain.VehicleConnectionReq;
+import java.util.List;
+
/**
* @ClassName FluxGetInfoService
* @Description 描述
@@ -17,4 +20,5 @@ public interface FluxGetInfoService {
Result vehicleConnection(VehicleConnectionReq req);
+ Result> getIps();
}
diff --git a/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java b/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java
index 6040cb9..1efa28b 100644
--- a/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java
+++ b/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java
@@ -1,8 +1,11 @@
package com.mobai.service.impl;
import com.alibaba.fastjson2.JSON;
+import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
+import com.aliyun.ecs20140526.models.DescribeInstancesResponseBody;
import com.mobai.domain.*;
import com.mobai.domain.flux.ApifoxModel;
+import com.mobai.openApi.SelectInstances;
import com.mobai.service.FluxGetInfoService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
@@ -15,7 +18,9 @@ import org.springframework.stereotype.Service;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
+import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
/**
* @ClassName FluxGetInfoServiceImpl
@@ -33,6 +38,8 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
@Autowired
private RedisTemplate redis;
+ @Autowired
+ private SelectInstances selectInstances;
/**
* 通过ip获取详细信息
*
@@ -96,6 +103,21 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
log.info("已获取到对象:{}",mqttServerModel);
return Result.success(mqttServerModel);
}
+
+ @Override
+ public Result> getIps() {
+ List ips = new ArrayList<>();
+ DescribeInstancesResponse response = selectInstances.getInfo();
+ ips = selectInstances.ips(response);
+ log.info("当前实例ip为{}", ips);
+ List finalIps = ips;
+ List collect = ips.stream().map(ip ->
+ new MqttServerModel() {{
+ setBroker(ip);
+ setTopic("topic" + finalIps.indexOf(ip));
+ }}).collect(Collectors.toList());
+ return Result.success(collect);
+ }
}
// 达到60%开启新服务,30%关闭低实例