feat:添加上线信息到redis

master
Yunfei Du 2024-06-01 11:48:56 +08:00
parent 77ade6288e
commit 61ffc902f4
12 changed files with 76 additions and 283 deletions

View File

@ -1,10 +0,0 @@
package com.load;
/**
* @ClassName MqttApplication
* @Description
* @Author YunFei.Du
* @Date 2024/5/30 9:19
*/
public class MqttApplication {
}

View File

@ -11,7 +11,7 @@ public class SubscribeSample {
/**
*
*/
String broker = "tcp://39.100.87.192:1883";
String broker = "tcp://39.100.113.246:1883";
/**
*
*/

View File

@ -53,90 +53,34 @@ public class InitConnectWeight implements ApplicationRunner {
//获取阿里云客户端
try {
client = ECSTool.createClient();
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest().setRegionId("cn-zhangjiakou");
DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, new RuntimeOptions());
List<List<String>> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream()
.map(instance -> instance.publicIpAddress.ipAddress)
.collect(Collectors.toList());
ipListList.forEach(strings -> strings.forEach(ipList::add));
} catch (Exception e) {
log.error("获取阿里云客户端或实例信息失败", e);
throw new RuntimeException(e);
}
//获取所有实例
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest()
.setRegionId("cn-zhangjiakou");
// .setInstanceType("ecs.e-c1m2.xlarge");
RuntimeOptions runtime = new RuntimeOptions();
try {
// 复制代码运行请自行打印 API 的返回值
DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime);
List<List<String>> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList());
for (List<String> strings : ipListList) {
for (String ip : strings) {
ipList.add(ip);
}
}
} catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
} catch (Exception _error) {
TeaException error = new TeaException(_error.getMessage(), _error);
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
}
//http://fluxmq.muyu.icu/public/login
//遍历所有ip,获取每一个服务的连接数
for (String ip : ipList) {
String url = "http://" + ip + ":8080/public/login";
Map<String, Object> request = new HashMap<>();
request.put("username", "fluxmq");
request.put("password", "fluxmq");
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> r = new HttpEntity<Map<String, Object>>(request, httpHeaders);
String result = restTemplate.postForObject(url, r, String.class);
//http://fluxmq.muyu.icu/public/cluster
int nextInt = new Random().nextInt(1000);
String getInfoUrl = "http://" + ip + ":8080/public/cluster?random=" + nextInt;
HttpHeaders httpHeadersGetInfo = new HttpHeaders();
httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON);
httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
httpHeadersGetInfo.set("Cookie", result);
HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo);
ResponseEntity<String> responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1);
log.error("响应是:{}", responseInfo.getBody());
JSONArray jsonArray = JSON.parseArray(responseInfo.getBody());
if (jsonArray.size() > 0) {
JSONObject jsonObject = jsonArray.getJSONObject(0);
Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize"));
connectWeightList.add(new ConnectWeight(ip,100-connectSize));
log.error("链接数量:{}", connectSize);
} else {
log.error("得到的相应数据为null");
try {
processIp(ip, connectWeightList);
} catch (Exception e) {
log.error("处理IP [{}] 时出错", ip, e);
}
}
// connectWeightList 的逻辑
// 初始化权重求和变量
Integer sum =0;
// 遍历ConnectWeight列表计算所有权重值的总和
for (ConnectWeight connectWeight : connectWeightList) {
sum = sum + connectWeight.getWeightValue();
}
// 初始化最大轮询次数为0
int max = 0;
for (ConnectWeight connectWeight : connectWeightList) {
log.error("权重值:{}",connectWeight.getWeightValue());
// 将权重值转换为百分比并向下取整,得到每一轮的轮询次数
Integer result = BigDecimal.valueOf(connectWeight.getWeightValue() * 100).divide(BigDecimal.valueOf(sum), 0, RoundingMode.DOWN).intValue();
// 如果计算出的轮询次数大于当前最大值,更新最大值
@ -168,4 +112,57 @@ public class InitConnectWeight implements ApplicationRunner {
redisTemplate.opsForList().rightPush("ipList",ip);
}
}
private void processIp(String ip, List<ConnectWeight> connectWeightList) throws Exception {
String username = "fluxmq";
String password = "fluxmq";
String loginUrl = "http://" + ip + ":8080/public/login";
String getInfoUrl = "http://" + ip + ":8080/public/cluster";
Map< String, Object > loginRequest = new HashMap<> ( );
loginRequest.put ( "username", username );
loginRequest.put ( "password", password );
HttpHeaders headers = new HttpHeaders ( );
headers.setContentType ( MediaType.APPLICATION_JSON );
HttpEntity< Map< String, Object > > loginEntity = new HttpEntity<> ( loginRequest, headers );
// 登录操作
String loginResult = restTemplate.postForObject ( loginUrl, loginEntity, String.class );
// 使用登录cookie获取集群信息
HttpHeaders headersGetInfo = new HttpHeaders ( );
headersGetInfo.setContentType ( MediaType.APPLICATION_JSON );
headersGetInfo.setAccept ( Collections.singletonList ( MediaType.APPLICATION_JSON ) );
headersGetInfo.set ( "Cookie", loginResult );
HttpEntity< String > getInfoEntity = new HttpEntity<> ( headersGetInfo );
ResponseEntity< String > responseInfo = restTemplate.exchange ( getInfoUrl, HttpMethod.GET, getInfoEntity, String.class );
if (responseInfo.getStatusCodeValue ( ) == 200) {
JSONArray jsonArray = JSON.parseArray ( responseInfo.getBody ( ) );
if (jsonArray.size ( ) > 0) {
JSONObject jsonObject = jsonArray.getJSONObject ( 0 );
log.info ( "响应 :{}", jsonObject );
Integer connectSize = jsonObject.getJSONObject("mqttInfo").getJSONObject("runtimes").getInteger("mqtt.connect");
ConnectWeight connectWeight = new ConnectWeight ( ip, 100 - connectSize );
log.info ( "IP: {}, 连接数: {}, 权重值: {}", ip, connectSize, connectWeight.getWeightValue ( ) );
log.info ( "Full Response: {}", responseInfo.getBody ( ) );
connectWeightList.add ( connectWeight );
} else {
// 处理得到的响应数据为空的情况
log.warn ( "响应数据为空" );
}
} else {
// 处理HTTP请求失败的情况
int statusCode = responseInfo.getStatusCodeValue ( );
String responseBody = responseInfo.getBody ( );
log.error ( "Failed to fetch data from {}: Status code {} with response: {}", ip, statusCode, responseBody );
}
}
}

View File

@ -26,8 +26,8 @@ public class ReceiveHandler {
@Autowired
private InitConnectWeight initConnectWeight;
//监听sms队列 ADD_LOG_AAA
@RabbitListener(queues = {"ADD_LOG_AAA"})
//监听SMS队列 ADD_LOG_AAA
@RabbitListener(queues = {"test"})
public void receiveSms(Message message) {
try {
initConnectWeight.run(new DefaultApplicationArguments());

View File

@ -1,32 +0,0 @@
package com.load.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
/**
* ()
* @author YunFei.Du
* @date 22:37 2024/5/30
*/
@Service
public class MessageCallbackService implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}

View File

@ -1,46 +0,0 @@
package com.load.mqtt;
import lombok.AllArgsConstructor;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
/**
* Mqtt
* @author YunFei.Du
* @date 22:38 2024/5/30
*/
@Service
@AllArgsConstructor
public class MqttFactory {
private final MessageCallbackService messageCallbackService;
// 连接参数
/**
* Mqtt
* @param mqttProperties
* @return
*/
public MqttClient createClient(MqttProperties mqttProperties){
MqttClient mqttClient =null;
try {
mqttClient=new MqttClient ( mqttProperties.getBroker() , mqttProperties.getClientId() , new MemoryPersistence() );
} catch (MqttException e) {
throw new RuntimeException ( e );
}
MqttConnectOptions options = new MqttConnectOptions ( );
if (mqttProperties.isLogin()){
options.setUserName ( mqttProperties.getUsername() );
options.setPassword ( mqttProperties.getPassword().toCharArray() );
}
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
mqttClient.setCallback ( messageCallbackService );
return mqttClient;
}
}

View File

@ -1,51 +0,0 @@
package com.load.mqtt;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
/**
*
* @author YunFei.Du
* @date 8:53 2024/5/30
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqttProperties {
private String broker;
private String topic;
private String username;
private String password;
private String clientId;
/**
* mqtt
* @param ip
* @param topic
* @return
*/
public static MqttProperties configBuild(String ip, String topic){
return MqttProperties.builder()
.broker("tcp://"+ip+":1883")
.topic(topic)
.username("admin")
.password("public")
.clientId("protocol-parsing") //协议解析 定值 --> 配置
.build();
}
/**
*
* @return
*/
public boolean isLogin(){
// commons-lang3
return StringUtils.isBlank ( username ) && !StringUtils.isBlank ( password );
}
}

View File

@ -1,28 +0,0 @@
package com.load.rebbitmq;
import com.load.mqtt.MqttFactory;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* @author YunFei.Du
* @date 9:21 2024/5/30
*/
@Component
@Log4j2
public class MsgHandle {
@Autowired
private MqttFactory mqttFactory;
// @RabbitListener(queues = {"create.topic"})
// private void msg(String msg){
// log.info ( "接收到消息:{}" , msg );
// MqttProperties mqttProperties = MqttProperties.configBuild ( "39.100.87.192", "mqtt/test" );
// mqttFactory.createClient ( mqttProperties );
//
// }
}

View File

@ -1,36 +0,0 @@
package com.load.rebbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbit
* @author YunFei.Du
* @date 8:39 2024/5/31
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue initQueue() {
return new Queue ("create.totic", true);
}
@Bean
public DirectExchange direct(){
return new DirectExchange ("topic.direct");
}
@Bean
public Binding bindingla(DirectExchange direct, Queue initQueue){
return BindingBuilder.bind(initQueue)
.to(direct)
.with("protocol-parsing");
}
}

View File

@ -33,9 +33,7 @@ public class LoadBalanceServiceImpl implements LoadBalanceService {
@Override
public Result<MqttServerModel> getConnect(VehicleConnectionReq vehicleConnectionReq) {
log.error(vehicleConnectionReq.toString());
log.error("车辆连接请求:{}",vehicleConnectionReq.toString());
//判断是否有游标key --count
@ -65,8 +63,12 @@ public class LoadBalanceServiceImpl implements LoadBalanceService {
}
}
/**
* ipredis
* @param vinIp
*/
private void insertVinIp(VinIp vinIp) {
redisTemplate.opsForHash().put("vinIp",vinIp.getVin(),vinIp.getIp());
}
@SneakyThrows

View File

@ -126,7 +126,7 @@ public class ECSTool {
}
System.out.println ( "------------------------" );
}
log.info ( "ipList: " + ipList ); // [39.100.89.218, 39.100.87.192]
log.info ( "ipList: " + ipList ); // [39.100.89.218, 39.100.113.246]
return ipList;
} catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。

View File

@ -4,9 +4,6 @@ server:
port: 82
spring:
application:
# 协议解析
name: protocol-parsing
rabbitmq:
host: 111.229.102.61
port: 5672