feat():before testRunner

master
Saisai Liu 2024-06-07 22:37:37 +08:00
parent 0ccbcc4335
commit a99369e58b
7 changed files with 128 additions and 64 deletions

View File

@ -9,6 +9,9 @@
<inspection_tool class="AliLongLiteralsEndingWithLowercaseL" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="AliLongLiteralsEndingWithLowercaseL" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliMissingOverrideAnnotation" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="AliMissingOverrideAnnotation" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliWrapperTypeEquality" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="AliWrapperTypeEquality" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AutoCloseableResource" enabled="true" level="WARNING" enabled_by_default="true">
<option name="METHOD_MATCHER_CONFIG" value="java.util.Formatter,format,java.io.Writer,append,com.google.common.base.Preconditions,checkNotNull,org.hibernate.Session,close,java.io.PrintWriter,printf,java.io.PrintStream,printf,com.mobai.mq.rabbitmq.cofig.MqttFactory,buildOptions" />
</inspection_tool>
<inspection_tool class="MapOrSetKeyShouldOverrideHashCodeEquals" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="MapOrSetKeyShouldOverrideHashCodeEquals" enabled="true" level="WARNING" enabled_by_default="true" />
</profile> </profile>
</component> </component>

View File

@ -8,6 +8,8 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.List;
/** /**
* @ClassName FluxGetInfo * @ClassName FluxGetInfo
* @Description * @Description
@ -21,11 +23,16 @@ public class FluxGetInfoController {
@Autowired @Autowired
private FluxGetInfoService fluxGetInfoService; private FluxGetInfoService fluxGetInfoService;
@GetMapping("getInfo") @GetMapping("/getInfo/")
public Result getInfo(){ public Result getInfo(){
return fluxGetInfoService.getInfo(null); return fluxGetInfoService.getInfo(null);
} }
@GetMapping("getIps/")
public Result<List<MqttServerModel>> getIps(){
return fluxGetInfoService.getIps();
}
@PostMapping("/getIp") @PostMapping("/getIp")
public Result vehicleConnection(@RequestBody(required = false) VehicleConnectionReq req){ public Result vehicleConnection(@RequestBody(required = false) VehicleConnectionReq req){
return fluxGetInfoService.vehicleConnection(req); return fluxGetInfoService.vehicleConnection(req);

View File

@ -43,17 +43,17 @@ public class MessageHandler {
@Autowired @Autowired
private RedisService redisService; private RedisService redisService;
@RabbitListener(queues = {"create.topic"}) // @RabbitListener(queues = {"create.topic"})
private void message(String msg) { // private void message(String msg) {
log.info("消息内容:{}", msg); // log.info("消息内容:{}", msg);
MqttProperties topic0 = MqttProperties.configBuild( //// MqttProperties topic0 = MqttProperties.configBuild(
"39.98.69.92", //// "39.98.69.92",
"topic0"); //// "topic0");
log.info("接收到消息:{}", topic0); //// log.info("接收到消息:{}", topic0);
MqttClient client = mqttFactory.buildOptions(topic0); //// MqttClient client = mqttFactory.buildOptions(topic0);
log.info("client创建:{}", client); //// log.info("client创建:{}", client);
log.info("clientID创建:{}", client.getClientId()); //// log.info("clientID创建:{}", client.getClientId());
} // }
/** /**
@ -66,13 +66,37 @@ public class MessageHandler {
log.info("event:{}", msg); log.info("event:{}", msg);
// 事件内容 // 事件内容
JSONObject jsonObject = JSON.parseObject(msg); JSONObject jsonObject = JSON.parseObject(msg);
String vin = JSON.to(String.class, jsonObject.get("clientId")); String clientId = JSON.to(String.class, jsonObject.get("clientId"));
if (!clientId.contains("VIN")) {
log.error("不是车辆事件::{}",message);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
return;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (!clientId.contains("-")){
try {
throw new ServletException("不是vin——ip");
} catch (ServletException e) {
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
String[] split = clientId.split("-");
String vin = split[0];
String ip = split[1];
long timestamp = JSON.to(Long.class, jsonObject.get("timestamp")); long timestamp = JSON.to(Long.class, jsonObject.get("timestamp"));
// 判断登陆事件
if (jsonObject.get("auth") != null) { if (jsonObject.get("auth") != null) {
try { try {
log.info("上线事件"); log.info("上线事件");
String ip = redisService.getValue(vin); log.info("上线车辆vin:{}--上线时ip:{}", vin, ip);
log.info("上线车辆vin:{}\n\t上线时ip:{}", vin, ip);
if (ip == null) { if (ip == null) {
throw new ServletException("上线时ip为空"); throw new ServletException("上线时ip为空");
} }
@ -97,9 +121,8 @@ public class MessageHandler {
} }
} else { } else {
log.info("下线事件"); log.info("下线事件");
String ip = redisService.getValue(vin);
try { try {
log.info("下线车辆vin:{}\n\t下线时ip:{}", vin, ip); log.info("下线车辆vin:{}--下线时ip:{}", vin, ip);
if (ip == null) { if (ip == null) {
throw new ServletException("下线时ip为空"); throw new ServletException("下线时ip为空");
} }
@ -114,7 +137,7 @@ public class MessageHandler {
if (vinStayTime==null){ if (vinStayTime==null){
throw new ServletException("{"+vin+"}数据不存在"); throw new ServletException("{"+vin+"}数据不存在");
} }
String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime())); String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime()-8*60*60*1000));
vinStayTime.setStayLongTime(format); vinStayTime.setStayLongTime(format);
vinStayTime.setDownTime(timestamp); vinStayTime.setDownTime(timestamp);
// 修改下线时间 // 修改下线时间
@ -125,7 +148,7 @@ public class MessageHandler {
// 输出在线时长 // 输出在线时长
log.info("车辆在线时长为:{}", format); log.info("车辆在线时长为:{}", format);
log.info(update ? vin + "下线记录成功" : vin + "下线记录失败"); log.info(update ? vin + "下线记录成功" : vin + "下线记录失败");
redisService.deleteObject(vin);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ServletException | IOException e) { } catch (ServletException | IOException e) {
log.error("下线失败"); log.error("下线失败");

View File

@ -26,8 +26,6 @@ import java.util.*;
@Component @Component
public class SelectInstances { public class SelectInstances {
@Autowired
private FluxGetInfoService fluxGetInfoService;
@Autowired @Autowired
private RedisTemplate<String, String> redisTemplate; private RedisTemplate<String, String> redisTemplate;
@ -69,28 +67,34 @@ public class SelectInstances {
return describeInstancesResponse; return describeInstancesResponse;
} }
public List<String> ips( DescribeInstancesResponse response){
List<String> ips = new ArrayList<>();
DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances();
List<DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance> instance =
instances.getInstance();
for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance inst : instance) {
List<String> ipAddress = inst
.getPublicIpAddress()
.getIpAddress();
if (ipAddress.isEmpty()) {
continue;
} else {
ipAddress.forEach(ip -> ips.add(ip));
}
}
log.info("当前实例ip为{}", ips);
return ips;
}
//1分钟 //1分钟
@Scheduled(cron = "0 0/1 * * * ? ") @Scheduled(cron = "* 0/5 * * * ? ")
//10秒 //10秒
// @Scheduled(cron = "0/10 * * * * ? ") // @Scheduled(cron = "0/10 * * * * ? ")
public void saveIps() throws Exception { public void saveIps() {
List<String> ips = new ArrayList<>(); List<String> ips = null;
DescribeInstancesResponse response = this.getInfo(); DescribeInstancesResponse response = this.getInfo();
try { try {
DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances(); ips = this.ips(response);
List<DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance> instance =
instances.getInstance();
for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance inst : instance) {
List<String> ipAddress = inst
.getPublicIpAddress()
.getIpAddress();
if (ipAddress.isEmpty()) {
continue;
} else {
ipAddress.forEach(ip -> ips.add(ip));
}
}
log.info("当前实例ip为{}", ips);
} catch (TeaException error) { } catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message // 错误 message
@ -114,7 +118,6 @@ public class SelectInstances {
} }
List<SmallNode> nodes = new ArrayList<>(); List<SmallNode> nodes = new ArrayList<>();
for (String ip : ips) { for (String ip : ips) {
Result<ApifoxModel> info = fluxGetInfoService.getInfo(ip);
// 获取连接总数 // 获取连接总数
String string = redisTemplate.opsForValue().get("onlineCar-" + ip); String string = redisTemplate.opsForValue().get("onlineCar-" + ip);
long connectSize = Long.parseLong(string == null ? "0" : string); long connectSize = Long.parseLong(string == null ? "0" : string);
@ -184,7 +187,6 @@ public class SelectInstances {
break; break;
} }
} }
redisTemplate.opsForList().leftPush("fluxMq", JSON.toJSONString(ips));
// 可负载IP轮询排列 // 可负载IP轮询排列
log.info("排列ip,{}", ips); log.info("排列ip,{}", ips);
Boolean mqttIp = redisTemplate.delete("mqttIp"); Boolean mqttIp = redisTemplate.delete("mqttIp");

View File

@ -36,30 +36,33 @@ public class MqttRunner implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
List<String> ips = redisService.getList("mqttIp"); // 存入mqttIp
List<MqttServerModel> list = ips.stream().distinct().map(str -> JSON.parseObject(str, MqttServerModel.class)).toList(); selectInstances.saveIps();
list.forEach(mqttServerModel -> {
// List<String> ips = redisService.getList("mqttIp");
// List<MqttServerModel> list = ips.stream().distinct().map(str -> JSON.parseObject(str, MqttServerModel.class)).toList();
// list.forEach(mqttServerModel -> {
// MqttProperties mqttProperties = MqttProperties.configBuild(mqttServerModel.getBroker(), mqttServerModel.getTopic());
//// MqttProperties mqttProperties = new MqttProperties();
//// mqttProperties.setBroker("tcp://39.98.69.92:1883");
//// mqttProperties.setTopic("mqtt/test");
// mqttProperties.setUsername("emqx");
// mqttProperties.setPassword("public");
//// mqttProperties.setClientid("subscribe_client");
// int qos = 0;
// try {
// MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties);
// // 连接参数
// MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties);
// // 设置回调
// client.setCallback(new MqttCallBackServiceImpl());
// // 进行连接
// client.connect(options);
// client.subscribe(mqttProperties.getTopic(), qos);
// } catch (Exception e) {
// e.printStackTrace();
// }
// });
});
MqttProperties mqttProperties = MqttProperties.configBuild("39.98.69.92", "topic0");
// MqttProperties mqttProperties = new MqttProperties();
// mqttProperties.setBroker("tcp://39.98.69.92:1883");
// mqttProperties.setTopic("mqtt/test");
mqttProperties.setUsername("emqx");
mqttProperties.setPassword("public");
// mqttProperties.setClientid("subscribe_client");
int qos = 0;
try {
MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties);
// 连接参数
MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties);
// 设置回调
client.setCallback(new MqttCallBackServiceImpl());
// 进行连接
client.connect(options);
client.subscribe(mqttProperties.getTopic(), qos);
} catch (Exception e) {
e.printStackTrace();
}
} }
} }

View File

@ -1,8 +1,11 @@
package com.mobai.service; package com.mobai.service;
import com.mobai.domain.MqttServerModel;
import com.mobai.domain.Result; import com.mobai.domain.Result;
import com.mobai.domain.VehicleConnectionReq; import com.mobai.domain.VehicleConnectionReq;
import java.util.List;
/** /**
* @ClassName FluxGetInfoService * @ClassName FluxGetInfoService
* @Description * @Description
@ -17,4 +20,5 @@ public interface FluxGetInfoService {
Result vehicleConnection(VehicleConnectionReq req); Result vehicleConnection(VehicleConnectionReq req);
Result<List<MqttServerModel>> getIps();
} }

View File

@ -1,8 +1,11 @@
package com.mobai.service.impl; package com.mobai.service.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
import com.aliyun.ecs20140526.models.DescribeInstancesResponseBody;
import com.mobai.domain.*; import com.mobai.domain.*;
import com.mobai.domain.flux.ApifoxModel; import com.mobai.domain.flux.ApifoxModel;
import com.mobai.openApi.SelectInstances;
import com.mobai.service.FluxGetInfoService; import com.mobai.service.FluxGetInfoService;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -15,7 +18,9 @@ import org.springframework.stereotype.Service;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* @ClassName FluxGetInfoServiceImpl * @ClassName FluxGetInfoServiceImpl
@ -33,6 +38,8 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
@Autowired @Autowired
private RedisTemplate<String, String> redis; private RedisTemplate<String, String> redis;
@Autowired
private SelectInstances selectInstances;
/** /**
* ip * ip
* *
@ -96,6 +103,21 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
log.info("已获取到对象:{}",mqttServerModel); log.info("已获取到对象:{}",mqttServerModel);
return Result.success(mqttServerModel); return Result.success(mqttServerModel);
} }
@Override
public Result<List<MqttServerModel>> getIps() {
List<String> ips = new ArrayList<>();
DescribeInstancesResponse response = selectInstances.getInfo();
ips = selectInstances.ips(response);
log.info("当前实例ip为{}", ips);
List<String> finalIps = ips;
List<MqttServerModel> collect = ips.stream().map(ip ->
new MqttServerModel() {{
setBroker(ip);
setTopic("topic" + finalIps.indexOf(ip));
}}).collect(Collectors.toList());
return Result.success(collect);
}
} }
// 达到60%开启新服务30%关闭低实例 // 达到60%开启新服务30%关闭低实例