From c080668c5e703379e7f67e5e94ccab1e71208381 Mon Sep 17 00:00:00 2001
From: JangCan <2862008188@qq.com>
Date: Sun, 21 Apr 2024 16:49:35 +0800
Subject: [PATCH] feat 4.21
---
pom.xml | 7 ++
.../common/aliyun/config/RabbitMqConfig.java | 86 +++++++++++++
.../aliyun/config/RestClientConfig.java | 18 +++
.../com/loadcenter/common/domain/CPUInfo.java | 30 +++++
.../loadcenter/common/domain/FlowInfo.java | 28 +++++
.../common/domain/InstancesInformation.java | 11 --
.../com/loadcenter/common/domain/JVMInfo.java | 58 +++++++++
.../common/redis/service/RedisService.java | 19 +--
.../gateway/cache/GatewayNodeScoreCache.java | 50 --------
.../gateway/cache/GatewayZSetNodeCache.java | 70 ++++++++---
.../gateway/cache/VehicleOnlineCache.java | 47 -------
.../gateway/model/WorkGatewayNode.java | 2 +-
.../gateway/model/WorkGatewayNodeSource.java | 22 ----
.../com/loadcenter/rabbitMq/Consumer.java | 117 ++++++++++++++++++
.../service/impl/GatewayLoadServiceImpl.java | 84 ++++++++-----
.../impl/GatewayVehicleServiceImpl.java | 21 ++--
src/main/resources/application.yml | 12 ++
17 files changed, 485 insertions(+), 197 deletions(-)
create mode 100644 src/main/java/com/loadcenter/common/aliyun/config/RabbitMqConfig.java
create mode 100644 src/main/java/com/loadcenter/common/aliyun/config/RestClientConfig.java
create mode 100644 src/main/java/com/loadcenter/common/domain/CPUInfo.java
create mode 100644 src/main/java/com/loadcenter/common/domain/FlowInfo.java
create mode 100644 src/main/java/com/loadcenter/common/domain/JVMInfo.java
delete mode 100644 src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java
delete mode 100644 src/main/java/com/loadcenter/gateway/cache/VehicleOnlineCache.java
delete mode 100644 src/main/java/com/loadcenter/gateway/model/WorkGatewayNodeSource.java
create mode 100644 src/main/java/com/loadcenter/rabbitMq/Consumer.java
diff --git a/pom.xml b/pom.xml
index 6edd979..deda98c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,6 +31,12 @@
+
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
org.springframework.boot
@@ -102,6 +108,7 @@
0.0.3
+
diff --git a/src/main/java/com/loadcenter/common/aliyun/config/RabbitMqConfig.java b/src/main/java/com/loadcenter/common/aliyun/config/RabbitMqConfig.java
new file mode 100644
index 0000000..a0d8f3d
--- /dev/null
+++ b/src/main/java/com/loadcenter/common/aliyun/config/RabbitMqConfig.java
@@ -0,0 +1,86 @@
+package com.loadcenter.common.aliyun.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+
+
+@Configuration
+public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
+ public static final Logger logger= LoggerFactory.getLogger(RabbitMqConfig.class);
+
+ private RabbitTemplate rabbitTemplate;
+
+
+ @Bean
+ public MessageConverter messageConverter(){
+ return new Jackson2JsonMessageConverter();
+ }
+
+ public static final String QUEUE="queue";
+ public static final String EXCHANGE="exchange";
+ public static final String KEY="Key";
+
+
+ @Primary
+ @Bean
+ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
+
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+
+ this.rabbitTemplate=rabbitTemplate;
+
+ rabbitTemplate.setMessageConverter(messageConverter());
+ rabbitTemplate();
+ return rabbitTemplate;
+ }
+
+ public void rabbitTemplate(){
+ rabbitTemplate.setConfirmCallback(this);
+ rabbitTemplate.setReturnsCallback(this);
+ }
+
+
+ @Bean
+ public Queue queue(){
+ return new Queue(QUEUE,true);
+ }
+
+
+ @Bean
+ public DirectExchange directExchange(){
+ return new DirectExchange(EXCHANGE);
+ }
+
+
+
+ @Bean
+ public Binding binding(){
+ return BindingBuilder.bind(queue()).to(directExchange()).with(KEY);
+ }
+
+
+
+ @Override
+ public void confirm(CorrelationData correlationData, boolean ack, String s) {
+
+ if (ack){
+ logger.info("{}消息到达交换机",correlationData.getId());
+ }else{
+ logger.error("{}消息丢失",correlationData.getId());
+ }
+ }
+
+ @Override
+ public void returnedMessage(ReturnedMessage returnedMessage) {
+ logger.error("{}消息未到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
+ }
+}
diff --git a/src/main/java/com/loadcenter/common/aliyun/config/RestClientConfig.java b/src/main/java/com/loadcenter/common/aliyun/config/RestClientConfig.java
new file mode 100644
index 0000000..3e51c08
--- /dev/null
+++ b/src/main/java/com/loadcenter/common/aliyun/config/RestClientConfig.java
@@ -0,0 +1,18 @@
+package com.loadcenter.common.aliyun.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.client.RestTemplate;
+/**
+ * @ClassName RestClientConfig
+ * @Description RestClientConfig
+ * @Author Can.J
+ * @Date 2024/4/21 16:18
+ */
+@Configuration
+public class RestClientConfig {
+ @Bean
+ public RestTemplate restTemplate(){
+ return new RestTemplate();
+ }
+}
diff --git a/src/main/java/com/loadcenter/common/domain/CPUInfo.java b/src/main/java/com/loadcenter/common/domain/CPUInfo.java
new file mode 100644
index 0000000..b9b200e
--- /dev/null
+++ b/src/main/java/com/loadcenter/common/domain/CPUInfo.java
@@ -0,0 +1,30 @@
+package com.loadcenter.common.domain;
+
+import lombok.Data;
+
+/**
+ * CPU使用信息
+ */
+@Data
+public class CPUInfo {
+ /**
+ * CPU核数
+ */
+ private long cpuNum;
+ /**
+ * 内核态使用率
+ */
+ private String cSys;
+ /**
+ * 空闲率
+ */
+ private String idle;
+ /**
+ * I/O等待
+ */
+ private String iowait;
+ /**
+ * 用户态使用率
+ */
+ private String user;
+}
diff --git a/src/main/java/com/loadcenter/common/domain/FlowInfo.java b/src/main/java/com/loadcenter/common/domain/FlowInfo.java
new file mode 100644
index 0000000..1f339cf
--- /dev/null
+++ b/src/main/java/com/loadcenter/common/domain/FlowInfo.java
@@ -0,0 +1,28 @@
+package com.loadcenter.common.domain;
+
+import lombok.Data; /**
+ * 节点状态
+ */
+@Data
+public class FlowInfo {
+ /**
+ * 上次读取吞吐量
+ */
+ private String lastReadThroughput;
+ /**
+ * 上次写入吞吐量
+ */
+ private String lastWriteThroughput;
+ /**
+ * 读取总吞吐量
+ */
+ private String readBytesHistory;
+ /**
+ * 实写字节
+ */
+ private String realWriteBytes;
+ /**
+ * 写入总吞吐量
+ */
+ private String writeBytesHistory;
+}
diff --git a/src/main/java/com/loadcenter/common/domain/InstancesInformation.java b/src/main/java/com/loadcenter/common/domain/InstancesInformation.java
index 2009d14..008f795 100644
--- a/src/main/java/com/loadcenter/common/domain/InstancesInformation.java
+++ b/src/main/java/com/loadcenter/common/domain/InstancesInformation.java
@@ -14,17 +14,6 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@Data
public class InstancesInformation {
- /*
- * log.info("查询第{" + count + "}个实例的ID:" + item.getInstanceId());
- log.info("名称:" + item.getInstanceName());
- log.info("状态:" + item.getStatus());
- log.info("公网IP:" + UserUtil.removeBrackets(item.getPublicIpAddress().getIpAddress().toString()));
- log.info("私网IP:" + UserUtil.removeBrackets(item.getVpcAttributes().getPrivateIpAddress().ipAddress.toString()));
- log.info("创建时间:" + item.getCreationTime());
- log.info("到期时间:" + item.getExpiredTime());
- log.info("是否可以回收:" + (item.getRecyclable() ? "是" : "否") + "\n\n");
- * */
-
/*
* 实例的ID
* */
diff --git a/src/main/java/com/loadcenter/common/domain/JVMInfo.java b/src/main/java/com/loadcenter/common/domain/JVMInfo.java
new file mode 100644
index 0000000..5952cca
--- /dev/null
+++ b/src/main/java/com/loadcenter/common/domain/JVMInfo.java
@@ -0,0 +1,58 @@
+package com.loadcenter.common.domain;
+
+import lombok.Data;
+
+/**
+ * JVM使用信息
+ */
+@Data
+public class JVMInfo {
+ /**
+ * 文件描述(句柄)
+ */
+ private String fileDescriptors;
+ /**
+ * 堆内存
+ */
+ private String heapCommit;
+ /**
+ * 堆初始化空间
+ */
+ private String heapInit;
+ /**
+ * 堆最大内存
+ */
+ private String heapMax;
+ /**
+ * 堆使用空间
+ */
+ private String heapUsed;
+ /**
+ * JAVA目录
+ */
+ private String jdkHome;
+ /**
+ * JDK版本
+ */
+ private String jdkVersion;
+ /**
+ * 非堆空间
+ */
+ private String noHeapCommit;
+ /**
+ * 非堆初始化空间
+ */
+ private String noHeapInit;
+ /**
+ * 非堆最大空间
+ */
+ private String noHeapMax;
+ /**
+ * 非堆使用空间
+ */
+ private String noHeapUsed;
+ /**
+ * 线程数量
+ */
+ private long threadCount;
+}
diff --git a/src/main/java/com/loadcenter/common/redis/service/RedisService.java b/src/main/java/com/loadcenter/common/redis/service/RedisService.java
index 9f86582..7137fe7 100644
--- a/src/main/java/com/loadcenter/common/redis/service/RedisService.java
+++ b/src/main/java/com/loadcenter/common/redis/service/RedisService.java
@@ -187,9 +187,9 @@ public class RedisService {
return setOperation;
}
- public BoundZSetOperations setCacheZSet(final String key,final T setValue,double score){
+ public BoundZSetOperations setCacheZSet(final String key, final T setValue, double score) {
BoundZSetOperations boundZSetOperations = redisTemplate.boundZSetOps(key);
- boundZSetOperations.add(setValue,score);
+ boundZSetOperations.add(setValue, score);
return boundZSetOperations;
}
@@ -218,6 +218,10 @@ public class RedisService {
return memberScores;
}
+ public Double incrementScore(final String key, final T value, final Double score) {
+ return redisTemplate.opsForZSet().incrementScore(key, value, score);
+ }
+
/**
* 缓存Set
*
@@ -231,8 +235,8 @@ public class RedisService {
setOperations.remove(setValue);
}
- public void deleteCacheSet(String key,T setValue){
- BoundSetOperations setOperation = redisTemplate.boundSetOps(key);
+ public void deleteCacheSet(String key, T setValue) {
+ BoundSetOperations setOperation = redisTemplate.boundSetOps(key);
setOperation.remove(setValue);
}
@@ -248,12 +252,13 @@ public class RedisService {
/**
- * 删除缓存zset的元素
+ * 删除缓存zset的元素
+ *
* @param key
* @param value
*/
- public void deleteCacheZset(final String key ,final String value){
- redisTemplate.opsForZSet().remove(key,value);
+ public void deleteCacheZset(final String key, final String value) {
+ redisTemplate.opsForZSet().remove(key, value);
}
/**
diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java
deleted file mode 100644
index e5803fb..0000000
--- a/src/main/java/com/loadcenter/gateway/cache/GatewayNodeScoreCache.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.loadcenter.gateway.cache;
-
-import com.loadcenter.gateway.cache.abs.GatewayCacheAbs;
-import com.loadcenter.gateway.model.WorkGatewayNode;
-import org.springframework.data.redis.core.ZSetOperations;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * @ClassName GatewayNodeScoreCache
- * @Description 网关节点分数
- * @Author Can.J
- * @Date 2024/4/19 14:47
- */
-@Service
-public class GatewayNodeScoreCache extends GatewayCacheAbs {
- private final static String gatewayNodeScoreCacheKey = "score";
-
- @Override
- public String getPre() {
- return "gateway:node:";
- }
-
- public List get() {
- Set> range = redisService.redisTemplate.opsForZSet().rangeWithScores(encode(gatewayNodeScoreCacheKey), 0, -1);
- return range.stream()
- .map(zSet -> WorkGatewayNode.builder()
- .nodeId(zSet.getValue()).source(zSet.getScore()).build())
- .toList();
- }
-
- public Long getNodeNowNum() {
- List workGatewayNodes = get();
- Long vehicleNowOnlineNum = Long.valueOf(String.valueOf(workGatewayNodes.stream().mapToDouble(WorkGatewayNode::getSource).sum()));
-
- return vehicleNowOnlineNum;
- }
-
- /**
- * 获取节点最大上线数量
- * @return
- */
- public Long getNodeMaxOnlineNum() {
- List workGatewayNodes = get();
- return workGatewayNodes.size() * 80L;
- }
-
-}
diff --git a/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java b/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java
index 1661072..1d853ad 100644
--- a/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java
+++ b/src/main/java/com/loadcenter/gateway/cache/GatewayZSetNodeCache.java
@@ -5,18 +5,19 @@ import com.loadcenter.gateway.model.WorkGatewayNode;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
- * @ClassName GatewayZSetNodeCache
- * @Description 服务器节点和车辆连接数
- * @Author Can.J
- * @Date 2024/4/19 9:00
+ * @ClassName GatewayZSetNodeCache
+ * @Description 服务器节点和车辆连接数
+ * @Author Can.J
+ * @Date 2024/4/19 9:00
*/
@Component
public class GatewayZSetNodeCache extends GatewayCacheAbs {
- private final static String gatewayZSetCount ="count";
+ private final static String gatewayZSetCount = "count";
@Override
public String getPre() {
@@ -24,44 +25,75 @@ public class GatewayZSetNodeCache extends GatewayCacheAbs {
}
/**
- * 获取所有zset数据
+ * 获取所有zset数据
+ *
* @return 负载节点集合
*/
- public Map