mq-load/src/main/java/com/mobai/service/impl/FluxGetInfoServiceImpl.java

123 lines
4.3 KiB
Java

package com.mobai.service.impl;
import com.alibaba.fastjson2.JSON;
import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
import com.mobai.domain.*;
import com.mobai.domain.flux.ApifoxModel;
import com.mobai.openApi.SelectInstances;
import com.mobai.service.FluxGetInfoService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @ClassName FluxGetInfoServiceImpl
* @Description 描述
* @Author Mobai
* @Date 2024/5/28 22:01
*/
@Log4j2
@Service
public class FluxGetInfoServiceImpl implements FluxGetInfoService {
@Autowired
private RestTemplate restTemplate;
@Autowired
private RedisTemplate<String, String> redis;
@Autowired
private SelectInstances selectInstances;
/**
* 通过ip获取详细信息
*
* @param ip
* @return
*/
@Override
public Result getInfo(String ip) {
String url = null;
if (ip == null) {
url = "http://39.98.69.92:8080/public/";
} else {
url = "http://" + ip + ":8080/public/";
}
User user = new User("fluxmq", "fluxmq");
//登录
AcceptToken token = restTemplate.postForObject(url + "login", user, AcceptToken.class);
//请求头
HttpHeaders headers = new HttpHeaders();
headers.add("token", token.getAccessToken());
//封装请求头
HttpEntity<MultiValueMap<String, Object>> formEntity = new HttpEntity<>(headers);
ResponseEntity<String> exchange = restTemplate.exchange(url + "cluster", HttpMethod.GET, formEntity, String.class);
System.out.println(exchange);
// System.out.println(exchange);
// System.out.println(exchange.getBody());
List<ApifoxModel> apifoxModel = JSON.parseArray(exchange.getBody(), ApifoxModel.class);
// get 获取具体所有信息
log.info(apifoxModel);
return Result.success(apifoxModel.get(0));
}
@Override
public Result vehicleConnection(VehicleConnectionReq req) {
// "vehicleVin": "VIN1234567894" vin
// "timestamp": "11111" new Date().getMillis()
// "username": "你好" Vin + timestamp
// "nonce": "33" 随机
log.warn("参数为:{}", req);
// String string = redis.opsForList().range("fluxMq", 0, -1).get(0);
// List<MqttServerModel> mqtts = JSON.parseArray(string, MqttServerModel.class);
// log.info("集合:{}",mqtts);
// tcp://192.168.1.1:1883
if (redis.hasKey("fluxMqIndex")) {
redis.opsForValue().increment("fluxMqIndex", 1);
} else {
redis.opsForValue().set("fluxMqIndex", 0 + "");
}
int index = Integer.valueOf(redis.opsForValue().get("fluxMqIndex"));
log.info("下标:{}", index);
// List<String> fluxmq = redis.opsForList().range("mqttIp", 0, 5);
String mqttIp = redis.opsForList().index("mqttIp", index);
// log.info(fluxmq);
log.info(mqttIp);
MqttServerModel mqttServerModel = JSON.parseObject(mqttIp, MqttServerModel.class);
if (index + 1 == 100) {
redis.opsForValue().set("fluxMqIndex", 0 + "");
}
log.info("已获取到对象:{}", mqttServerModel);
return Result.success(mqttServerModel);
}
@Override
public Result<List<MqttServerModel>> getIps() {
List<String> ips = new ArrayList<>();
DescribeInstancesResponse response = selectInstances.getInfo();
ips = selectInstances.ips(response);
log.info("当前实例ip为{}", ips);
List<String> finalIps = ips;
List<MqttServerModel> collect = ips.stream().map(ip ->
new MqttServerModel() {{
setBroker(ip);
setTopic("topic" + finalIps.indexOf(ip));
}}).collect(Collectors.toList());
return Result.success(collect);
}
}