feat():更新reids存储节点ip

master
Saisai Liu 2024-06-04 21:25:47 +08:00
parent bd973ab220
commit c997b75228
26 changed files with 101 additions and 69 deletions

View File

@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.*;
/** /**
* @ClassName FluxGetInfo * @ClassName FluxGetInfo
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/28 22:00 * @Date 2024/5/28 22:00
*/ */
@RestController @RestController

View File

@ -7,7 +7,7 @@ import org.springframework.web.bind.annotation.RestController;
/** /**
* @ClassName StayTimeController * @ClassName StayTimeController
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/4 9:54 * @Date 2024/6/4 9:54
*/ */
@RestController @RestController

View File

@ -5,7 +5,7 @@ import lombok.Data;
/** /**
* @ClassName AcceptToken * @ClassName AcceptToken
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/29 9:18 * @Date 2024/5/29 9:18
*/ */
@Data @Data

View File

@ -3,7 +3,7 @@ package com.mobai.domain;
/** /**
* @ClassName Constants * @ClassName Constants
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/28 22:05 * @Date 2024/5/28 22:05
*/ */
public class Constants { public class Constants {

View File

@ -5,7 +5,7 @@ import lombok.Data;
/** /**
* @ClassName MqttConnectState * @ClassName MqttConnectState
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/29 10:39 * @Date 2024/5/29 10:39
*/ */
@Data @Data

View File

@ -12,7 +12,7 @@ import java.util.Date;
/** /**
* @ClassName StayTime * @ClassName StayTime
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/4 9:28 * @Date 2024/6/4 9:28
*/ */
@Data @Data

View File

@ -7,7 +7,7 @@ import lombok.NoArgsConstructor;
/** /**
* @ClassName User * @ClassName User
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/29 9:05 * @Date 2024/5/29 9:05
*/ */
@Data @Data

View File

@ -10,7 +10,7 @@ import lombok.NoArgsConstructor;
* *
* @ClassName Endpoints * @ClassName Endpoints
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/2 15:16 * @Date 2024/6/2 15:16
*/ */
@Data @Data

View File

@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
/** /**
* @ClassName Events * @ClassName Events
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/2 15:28 * @Date 2024/6/2 15:28
*/ */
@Data @Data

View File

@ -9,7 +9,7 @@ import lombok.NoArgsConstructor;
* *
* @ClassName RunTimes * @ClassName RunTimes
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/2 15:22 * @Date 2024/6/2 15:22
*/ */
@Data @Data

View File

@ -7,7 +7,7 @@ import org.apache.ibatis.annotations.Mapper;
/** /**
* @ClassName StayTimeMapper * @ClassName StayTimeMapper
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/4 9:57 * @Date 2024/6/4 9:57
*/ */
@Mapper @Mapper

View File

@ -7,7 +7,7 @@ import java.io.IOException;
/** /**
* @ClassName FluxMqInit * @ClassName FluxMqInit
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/27 9:57 * @Date 2024/5/27 9:57
*/ */
public class FluxMqInit { public class FluxMqInit {

View File

@ -12,7 +12,7 @@ import org.springframework.stereotype.Service;
/** /**
* @ClassName MqttFactory * @ClassName MqttFactory
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/31 11:35 * @Date 2024/5/31 11:35
*/ */

View File

@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
/** /**
* @ClassName MqttProperties * @ClassName MqttProperties
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/30 20:05 * @Date 2024/5/30 20:05
*/ */
@Data @Data

View File

@ -10,7 +10,7 @@ import org.springframework.context.annotation.Configuration;
/** /**
* @ClassName RabbitConfig * @ClassName RabbitConfig
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/31 21:47 * @Date 2024/5/31 21:47
*/ */
@Configuration @Configuration

View File

@ -7,7 +7,7 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
/** /**
* @ClassName GetOptions * @ClassName GetOptions
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/31 14:26 * @Date 2024/5/31 14:26
*/ */

View File

@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
* *
* @ClassName MqttCallBackServiceImpl * @ClassName MqttCallBackServiceImpl
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/30 20:02 * @Date 2024/5/30 20:02
*/ */
@Service @Service

View File

@ -27,7 +27,7 @@ import java.util.Date;
* *
* @ClassName MessageHandler * @ClassName MessageHandler
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/31 14:37 * @Date 2024/5/31 14:37
*/ */
@Log4j2 @Log4j2
@ -64,22 +64,7 @@ public class MessageHandler {
@RabbitListener(queues = {"event"}) @RabbitListener(queues = {"event"})
private void trainUp(String msg, Message message, Channel channel) { private void trainUp(String msg, Message message, Channel channel) {
log.info("event:{}", msg); log.info("event:{}", msg);
// 链接事件 // 事件内容
// event:{
// "protocol":"MQTT",
// "clientIp":"39.144.107.165",
// "nodeIp":"127.0.0.1",
// "clientId":"VIN123456789DIJE4",
// "version":"MQTT_3_1_1",
// "keepalive":20,
// "cleanSession":true,
// "timestamp":1717466764797,
// "auth":{
// "username":"6D7A546314155D43A339EE4C0410613D86C821299316ADECDB871E08",
// "password":"VklOMTIzNDU2Nzg5RElKRTQxNzE3NDY2NzY1MDg3NTgyNDI4QThEQjA0RkU2OTkzNTM5NDIyNTQ2ODIwQzFFNzc3NDUzQTA4NzIzRTU4NUQyNDRBNjY="
// },
// "messageId":0}
JSONObject jsonObject = JSON.parseObject(msg); JSONObject jsonObject = JSON.parseObject(msg);
String vin = JSON.to(String.class, jsonObject.get("clientId")); String vin = JSON.to(String.class, jsonObject.get("clientId"));
long timestamp = JSON.to(Long.class, jsonObject.get("timestamp")); long timestamp = JSON.to(Long.class, jsonObject.get("timestamp"));
@ -111,16 +96,6 @@ public class MessageHandler {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} else { } else {
// event:{
// "protocol":"MQTT",
// "messageId":0,
// "timestamp":1717466777367,
// "reason":"normal",
// "clientId":"VIN123456789DIJE4",
// "nodeIp":"127.0.0.1",
// "id":354,
// "clientIp":"39.144.107.165"
// }
log.info("下线事件"); log.info("下线事件");
String ip = redisService.getValue(vin); String ip = redisService.getValue(vin);
try { try {
@ -135,7 +110,11 @@ public class MessageHandler {
eq(StayTime::getVin, vin); eq(StayTime::getVin, vin);
eq(StayTime::getDownTime, 0); eq(StayTime::getDownTime, 0);
}}); }});
String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(vinStayTime.getUpTime() - timestamp)); // 为空抛异常
if (vinStayTime==null){
throw new ServletException("{"+vin+"}数据不存在");
}
String format = new SimpleDateFormat("HH时mm分ss秒").format(new Date(timestamp - vinStayTime.getUpTime()));
vinStayTime.setStayLongTime(format); vinStayTime.setStayLongTime(format);
vinStayTime.setDownTime(timestamp); vinStayTime.setDownTime(timestamp);
// 修改下线时间 // 修改下线时间
@ -150,7 +129,6 @@ public class MessageHandler {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ServletException | IOException e) { } catch (ServletException | IOException e) {
log.error("下线失败"); log.error("下线失败");
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
try { try {

View File

@ -21,7 +21,6 @@ import org.springframework.stereotype.Component;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
@Log4j2 @Log4j2
@Component @Component
@ -33,6 +32,7 @@ public class SelectInstances {
@Autowired @Autowired
private RedisTemplate<String, String> redisTemplate; private RedisTemplate<String, String> redisTemplate;
/** /**
* 使AK&SKClient * 使AK&SKClient
* *
@ -70,22 +70,14 @@ public class SelectInstances {
return describeInstancesResponse; return describeInstancesResponse;
} }
//10分钟 //10分钟
// @Scheduled(cron = "0 0/5 * * * ? ") @Scheduled(cron = "0 0/1 * * * ? ")
//10秒 //10秒
@Scheduled(cron = "0/10 * * * * ? ") // @Scheduled(cron = "0/10 * * * * ? ")
public void saveIps() throws Exception { public void saveIps() throws Exception {
List<String> ips = new ArrayList<>(); List<String> ips = new ArrayList<>();
// com.aliyun.ecs20140526.Client client = SelectInstances.createClient();
// com.aliyun.ecs20140526.models.DescribeInstancesRequest describeInstancesRequest = new com.aliyun.ecs20140526.models.DescribeInstancesRequest()
//// .setImageId("m-8vb8qnidv34yj3nbirhc")
// .setRegionId("cn-zhangjiakou");
// com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
DescribeInstancesResponse response = this.getInfo(); DescribeInstancesResponse response = this.getInfo();
try { try {
// 复制代码运行请自行打印 API 的返回值
// DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime);
DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances(); DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstances instances = response.getBody().getInstances();
List<DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance> instance = List<DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance> instance =
instances.getInstance(); instances.getInstance();
@ -181,6 +173,7 @@ public class SelectInstances {
if (node.ip.equals(ip)) mqttServerModel = new MqttServerModel(ip, "topic" + nodes.indexOf(node)); if (node.ip.equals(ip)) mqttServerModel = new MqttServerModel(ip, "topic" + nodes.indexOf(node));
} }
ips.add(mqttServerModel); ips.add(mqttServerModel);
int i1 = map.get(ip) - 1; int i1 = map.get(ip) - 1;
map.put(ip, i1); map.put(ip, i1);
if (i1 == 0) { if (i1 == 0) {
@ -195,6 +188,9 @@ public class SelectInstances {
redisTemplate.opsForList().leftPush("fluxMq", JSON.toJSONString(ips)); redisTemplate.opsForList().leftPush("fluxMq", JSON.toJSONString(ips));
// 可负载IP轮询排列 // 可负载IP轮询排列
log.info("排列ip,{}", ips); log.info("排列ip,{}", ips);
Boolean mqttIp = redisTemplate.delete("mqttIp");
ips.forEach(mqtt->redisTemplate.opsForList().leftPush("mqttIp", JSON.toJSONString(mqtt)));
} }

View File

@ -0,0 +1,55 @@
//package com.mobai.runner;
//
//import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
//import com.mobai.mq.rabbitmq.cofig.MqttFactory;
//import com.mobai.mq.rabbitmq.cofig.MqttProperties;
//import com.mobai.mq.rabbitmq.domian.GetOptions;
//import com.mobai.mq.rabbitmq.domian.MqttCallBackServiceImpl;
//import com.mobai.openApi.SelectInstances;
//import org.eclipse.paho.client.mqttv3.MqttClient;
//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.ApplicationArguments;
//import org.springframework.boot.ApplicationRunner;
//import org.springframework.data.redis.connection.RedisServer;
//import org.springframework.stereotype.Component;
//
///**
// * @ClassName MqttRunner
// * @Description 描述
// * @Author Mobai
// * @Date 2024/6/4 20:03
// */
//@Component
//public class MqttRunner implements ApplicationRunner {
//
// @Autowired
// private SelectInstances selectInstances;
// @Override
// public void run(ApplicationArguments args) throws Exception {
// String string = redisServer.get("fluxMq");
//
// MqttProperties mqttProperties = MqttProperties.configBuild("39.98.69.92", "topic0");
//// MqttProperties mqttProperties = new MqttProperties();
//// mqttProperties.setBroker("tcp://39.98.69.92:1883");
//// mqttProperties.setTopic("mqtt/test");
// mqttProperties.setUsername("emqx");
// mqttProperties.setPassword("public");
//// mqttProperties.setClientid("subscribe_client");
//
// int qos = 0;
// try {
// MqttClient client = new MqttFactory(new MqttCallBackServiceImpl()).buildOptions(mqttProperties);
// // 连接参数
// MqttConnectOptions options = GetOptions.getMqttOptionas(mqttProperties);
// // 设置回调
// client.setCallback(new MqttCallBackServiceImpl());
// // 进行连接
// client.connect(options);
// client.subscribe(mqttProperties.getTopic(), qos);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//}

View File

@ -6,7 +6,7 @@ import com.mobai.domain.VehicleConnectionReq;
/** /**
* @ClassName FluxGetInfoService * @ClassName FluxGetInfoService
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/28 22:01 * @Date 2024/5/28 22:01
*/ */
public interface FluxGetInfoService { public interface FluxGetInfoService {

View File

@ -7,7 +7,7 @@ import com.mobai.mapper.StayTimeMapper;
/** /**
* @ClassName StayTimeService * @ClassName StayTimeService
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/4 9:54 * @Date 2024/6/4 9:54
*/ */
public interface StayTimeService extends IService<StayTime> { public interface StayTimeService extends IService<StayTime> {

View File

@ -20,7 +20,7 @@ import java.util.List;
/** /**
* @ClassName FluxGetInfoServiceImpl * @ClassName FluxGetInfoServiceImpl
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/28 22:01 * @Date 2024/5/28 22:01
*/ */
@ -74,9 +74,9 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
// "username": "你好" Vin + timestamp // "username": "你好" Vin + timestamp
// "nonce": "33" 随机 // "nonce": "33" 随机
log.warn("参数为:{}", req); log.warn("参数为:{}", req);
String string = redis.opsForList().range("fluxMq", 0, -1).get(0); // String string = redis.opsForList().range("fluxMq", 0, -1).get(0);
List<MqttServerModel> mqtts = JSON.parseArray(string, MqttServerModel.class); // List<MqttServerModel> mqtts = JSON.parseArray(string, MqttServerModel.class);
log.info("集合:{}",mqtts); // log.info("集合:{}",mqtts);
// tcp://192.168.1.1:1883 // tcp://192.168.1.1:1883
if (redis.hasKey("fluxMqIndex")) { if (redis.hasKey("fluxMqIndex")) {
redis.opsForValue().increment("fluxMqIndex", 1); redis.opsForValue().increment("fluxMqIndex", 1);
@ -85,14 +85,17 @@ public class FluxGetInfoServiceImpl implements FluxGetInfoService {
} }
int index = Integer.valueOf(redis.opsForValue().get("fluxMqIndex")); int index = Integer.valueOf(redis.opsForValue().get("fluxMqIndex"));
log.info("下标:{}",index); log.info("下标:{}",index);
MqttServerModel mqttServerModel = mqtts.get(index); // List<String> fluxmq = redis.opsForList().range("mqttIp", 0, 5);
if (index + 1 == redis.opsForList().size("fluxmq")) { String mqttIp = redis.opsForList().index("mqttIp", index);
// log.info(fluxmq);
log.info(mqttIp);
MqttServerModel mqttServerModel = JSON.parseObject(mqttIp,MqttServerModel.class);
if (index + 1 == redis.opsForList().size("fluxMq")) {
redis.delete("fluxMqIndex"); redis.delete("fluxMqIndex");
} }
log.info("已获取到对象:{}",mqttServerModel); log.info("已获取到对象:{}",mqttServerModel);
return Result.success(mqttServerModel); return Result.success(mqttServerModel);
} }
} }
// 达到60%开启新服务30%关闭低实例 // 达到60%开启新服务30%关闭低实例

View File

@ -11,7 +11,7 @@ import org.springframework.stereotype.Service;
/** /**
* @ClassName StayTimeServiceImpl * @ClassName StayTimeServiceImpl
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/6/4 9:54 * @Date 2024/6/4 9:54
*/ */

View File

@ -5,7 +5,7 @@ import java.io.IOException;
/** /**
* @ClassName LoginTest * @ClassName LoginTest
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/28 15:37 * @Date 2024/5/28 15:37
*/ */
public class LoginTest { public class LoginTest {

View File

@ -5,7 +5,7 @@ import java.io.IOException;
/** /**
* @ClassName TestGet * @ClassName TestGet
* @Description * @Description
* @Author SaiSai.Liu * @Author Mobai
* @Date 2024/5/28 15:34 * @Date 2024/5/28 15:34
*/ */
public class TestGet { public class TestGet {