fix: 修改负载中心

master
rouchen 2024-06-07 22:22:48 +08:00
parent 7bff86090a
commit fec04652bb
15 changed files with 400 additions and 109 deletions

View File

@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan
public class ZnCarApplication {
public static void main(String[] args) {

View File

@ -12,6 +12,7 @@ import com.car.demos.ConnectWeight;
import com.car.demos.loadenter.Auth;
import com.car.demos.loadenter.Content;
import com.car.demos.loadenter.LoadEnterNumber;
import com.car.demos.loadenter.Node;
import com.car.service.impl.ConnectServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
@ -20,6 +21,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@ -51,11 +53,14 @@ public class InitConnectWeight implements ApplicationRunner {
@Autowired
private ConnectServiceImpl connectService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(ApplicationArguments args) {
connectService.loadCenterDel();
connectService.delNode();
ArrayList<ConnectWeight> connectWeightList = new ArrayList<>();
List<ConnectWeight> connectWeightList = new ArrayList<>();
ArrayList<String> ipList = new ArrayList<>();
@ -87,7 +92,6 @@ public class InitConnectWeight implements ApplicationRunner {
}
System.out.println("------------------------");
}
log.info("ip ip ip ip i p:{}", ipList);
} catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
@ -105,19 +109,24 @@ public class InitConnectWeight implements ApplicationRunner {
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
}
log.info("ipList:{}", ipList);
rabbitTemplate.convertAndSend("ip",ipList);
//网关收集节点
int gatewayNum = ipList.size();
//数据解析结点数量
int dataNum=0;
//整体负载率
String overallLoad="";
double overallLoad=0;
//总车辆
int connectEventSize =0;
//内存使用率
String memoryUseRate = null;
Double memoryUseRate = null;
//cpu使用率
String cpuUsage = null;
Double cpuUsage = null;
//权重使用率
Double nodeWeights =null;
//遍历所有ip,获取每一个服务的连接数
for (String ip : ipList) {
@ -132,21 +141,8 @@ public class InitConnectWeight implements ApplicationRunner {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> r = new HttpEntity<Map<String, Object>>(request, httpHeaders);
String result = restTemplate.postForObject(url, r, String.class);
//整体负载率
int total = 0;
// try {
// JSONObject jsonObject = JSON.parseObject(result);
// String data = jsonObject.getString("data");
// JSONObject jsonObject1 = JSON.parseObject(data);
// String load = jsonObject1.getString("load");
// overallLoad = load;
// log.info("overallLoad:{}", overallLoad);
// //数据解析结点数量
// dataNum = load.length();
// log.info("dataNum:{}", dataNum);
// //获取FluxMq运行时详情信息
// }
//http://fluxmq.muyu.icu/public/cluster
//获取FluxMq运行时详情信息
@ -163,11 +159,12 @@ public class InitConnectWeight implements ApplicationRunner {
log.info("响应是:{}", responseInfo.getBody());
JSONArray jsonArray = JSON.parseArray(responseInfo.getBody());
System.out.println("json"+jsonArray);
if (jsonArray.size() > 0) {
JSONObject jsonObject = jsonArray.getJSONObject(0);
// Integer connectSize = jsonObject.getJSONObject("mqttInfo").getJSONObject("runtimes").getInteger("mqtt.connect");
//获取节点名称
String nodeName = jsonObject.getString("nodeName");
//获取内存使用率
JSONObject flowInfo = jsonObject.getJSONObject("jvmInfo");
//内存大小
@ -184,14 +181,14 @@ public class InitConnectWeight implements ApplicationRunner {
double usedHeapBytes = Double.parseDouble(heapUsedSize.substring(0, heapUsedSize.length() - 3)) * MB_TO_BYTES; // 去掉"MB"并转换
// 计算使用率(百分比)
double usagePercentage = (usedHeapBytes / maxHeapBytes) * 100;
BigDecimal divide = BigDecimal.valueOf((usedHeapBytes / maxHeapBytes) * 100).divide(BigDecimal.valueOf(100), 2, RoundingMode.HALF_UP);
//内存率
memoryUseRate=usagePercentage+"%";
memoryUseRate=divide.doubleValue();
//cpu
JSONObject cpuInfo = jsonObject.getJSONObject("cpuInfo");
String string = cpuInfo.getString("user");
cpuUsage =string;
cpuUsage = Double.valueOf(string.split("%")[0]);
//获取mqtt
@ -205,47 +202,26 @@ public class InitConnectWeight implements ApplicationRunner {
//权重值
Integer weightValue = 100 - connectSize;
nodeWeights = Double.valueOf(weightValue/100);
Node build = Node.builder()
.nodeName(ip)
.memoryUseRate(memoryUseRate)
.cpuUsage(cpuUsage)
.nodeWeights(nodeWeights)
.build();
connectService.nodeAdd(build);
log.info("链接数量:{}", connectSize);
} else {
log.info("得到的相应数据为null");
}
// //获取车辆列表
// String postList = "http://"+ip+":8080/mqtt/connection";
//
// HttpClient closeableHttpClient = HttpClients.createDefault();
// HttpPost httpPost = new HttpPost(postList);
// JSONObject object = JSONObject.parseObject(result);
// httpPost.addHeader("accessToken",object.getString("accessToken") ); // 设置请求头
// try {
// HttpResponse response = closeableHttpClient.execute(httpPost);
// Object o = restTemplate.postForObject(postList, response, Object.class);
// log.info("响应是:{}", o);
// JSONObject jsonObject = JSONObject.parseObject(response.toString());
// JSONArray content = jsonObject.getJSONArray("content");
// for (int i = 0; i < content.size(); i++) {
// // 获取每个元素JSONObject
// JSONObject contentItem = content.getJSONObject(i);
// // 获取state字段的值
// int state = contentItem.getIntValue("state");
// if (state == 1) {
// // 如果state为1则将当前对象添加到列表中
// filteredContent.add(contentItem);
// }
// }
// log.info("车辆列表:{}", response);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
}
log.info("车辆在线是",connectEventSize);
//每个连接数的权重总和
Integer sum =0;
for (ConnectWeight connectWeight : connectWeightList) {
sum =sum+connectWeight.getWeightValue();
sum = (int) (sum+connectWeight.getWeightValue());
}
System.out.println("sum"+sum);
@ -262,7 +238,7 @@ public class InitConnectWeight implements ApplicationRunner {
System.out.println("100次轮询次数:{}"+result);
sumOfWeights+=connectWeight.getWeightValue();
}
overallLoad =sumOfWeights/ipList.size()+"%";
overallLoad =sumOfWeights/ipList.size();
log.info("总负载",overallLoad);
LoadEnterNumber build = LoadEnterNumber.builder()
.carSum(connectEventSize)
@ -272,39 +248,36 @@ public class InitConnectWeight implements ApplicationRunner {
.build();
connectService.LoadCenterAdd(build);
ArrayList<String> weightIpList = new ArrayList<>();
redisTemplate.delete("ip");
for (int i = 0; i <= max; i++) {
for (ConnectWeight connectWeight : connectWeightList) {
redisTemplate.opsForList().rightPush("ip", String.valueOf(i));
log.info("权重值:{}",connectWeight.getWeightValue());
if (connectWeight.getWeightValue() > i) {
weightIpList.add(connectWeight.getCarServerIp());
log.info("轮询结果:{}",connectWeight.getCarServerIp());
return;
}else if (connectWeight.getWeightValue() == max ){
weightIpList.add(connectWeight.getCarServerIp());
log.info("轮询结果:{}",connectWeight.getCarServerIp());
}
}
}
// 存入redis
redisTemplate.delete("ips");
for (String ip : weightIpList) {
redisTemplate.opsForList().rightPush("ips",ip);
}
// List<Content> contents = new ArrayList<>();
// for (JSONObject jsonObject : filteredContent) {
// Content build = Content.builder()
// .cleanSession(jsonObject.getInteger("cleanSession"))
// .id(jsonObject.getInteger("id"))
// .clientPort(jsonObject.getInteger("clientPort"))
// .protocolType(jsonObject.getString("protocolType"))
// .version(jsonObject.getString("version"))
// .state(jsonObject.getInteger("state"))
// .connectTime(jsonObject.getDate("connectTime"))
// .auth(jsonObject.getJSONArray("auth").toJavaList(Auth.class))
// .build();
// contents.add(build);
// //每台服务器百分之八十
// HashMap<String, Double> hashMap = new HashMap<>(){{
// for (String s : ipList) {
// put(s,0.8-connectWeightList.get(ipList.indexOf(s)).getWeightValue()/100);
// }
// }
// };
// List<ConnectWeight> list = hashMap.entrySet().stream()
// .map(entry -> new ConnectWeight())
// .collect(Collectors.toList());
}
}

View File

@ -0,0 +1,70 @@
package com.car.consumer;
import com.alibaba.fastjson.JSONObject;
import com.car.config.InitConnectWeight;
import com.car.config.RabbitmqConfig;
import com.car.controller.ConnectController;
import com.car.demos.car.Car;
import com.car.service.ConnectService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.DefaultApplicationArguments;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.nio.channels.Channel;
import java.util.Date;
/**
* ConnectHandler
*
* @author Yangle
* Date 2024/6/3 19:33
*/
@Component
@Slf4j
public class ConnectHandler {
@Autowired
private ConnectController connectController;
@Autowired
private InitConnectWeight initConnectWeight;
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(Object msg, Message message, Channel channel){
System.out.println("QUEUE_INFORM_EMAIL msg"+msg);
}
//监听sms队列
@RabbitListener(queues = "connect")
public void receiveSms(Message message) {
try {
// 假设消息体是UTF-8编码的字符串
String jsonString = new String(message.getBody(), "UTF-8");
log.info("接收到的车辆信息: {}", jsonString);
// 解析JSON字符串
JSONObject jsonObject = JSONObject.parseObject(jsonString);
log.info("jsonObject: {}", jsonObject);
// 提取timestamp的值
long timestamp = jsonObject.getLong("timestamp");
log.info("提取到的timestamp值: {}", timestamp);
log.info("车辆信息",message.getBody());
String ip = jsonObject.getString("clientIp");
String vin = jsonObject.getString("clientId");
Car build = Car.builder()
.vin(vin.split("-")[0])
.timestamp(new Date(timestamp))
.ip(ip)
.build();
connectController.carAdd(build);
} catch (Exception e) {
throw new RuntimeException(e);
}
log.info("消费者得到的消息: {}" , new String(message.getBody()));
}
}

View File

@ -1,8 +1,13 @@
package com.car.controller;
import com.alibaba.fastjson.JSONObject;
import com.car.demos.MqttServerModel;
import com.car.demos.Result;
import com.car.demos.car.Car;
import com.car.demos.car.CarReq;
import com.car.demos.loadenter.LoadEnterNumber;
import com.car.demos.loadenter.Node;
import com.car.demos.loadenter.NodeReq;
import com.car.demos.req.VehicleConnectionReq;
import com.car.service.ConnectService;
import org.springframework.beans.factory.annotation.Autowired;
@ -11,6 +16,8 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* ConnectController
*
@ -25,7 +32,7 @@ public class ConnectController {
private ConnectService connectService;
@PostMapping("/getConnect")
public Result<MqttServerModel>getConnect(@RequestBody VehicleConnectionReq vehicleConnectionReq){
return connectService.getConnect();
return connectService.getConnect(vehicleConnectionReq);
}
/**
@ -47,4 +54,37 @@ public class ConnectController {
connectService.loadCenterDel();
return Result.success("删除成功");
}
@PostMapping("/select")
public Result<LoadEnterNumber> select(){
return Result.success(connectService.select());
}
@PostMapping("/nodeAdd")
public Result nodeAdd(@RequestBody Node node){
connectService.nodeAdd(node);
return Result.success("添加成功");
}
@PostMapping("/carAdd")
public Result carAdd(@RequestBody Car car){
connectService.carAdd(car);
return Result.success("添加成功");
}
@PostMapping("/selectCar")
public Result<List<Car>> carAdd( ) {
List<Car> list= connectService.selectCar();
return Result.success(list);
}
@PostMapping("/delNode")
public Result delNode(){
connectService.delNode();
return Result.success("删除成功");
}
@PostMapping("/selectNode")
public Result<List<Node>> selectNode(){
return Result.success(connectService.selectNode());
}
}

View File

@ -3,6 +3,9 @@ package com.car.demos;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Map;
/**
* ConnectWeight
@ -23,5 +26,24 @@ public class ConnectWeight {
*
*/
private Integer weightValue;
//
// public void ConnectWeightInfo(Map.Entry<String, Double> entry){
// this.carServerIp = entry.getKey();
// this.weightValue = entry.getValue()*100;
// }
// public String getCarServerIp() {
// return carServerIp;
// }
//
// public void setCarServerIp(String carServerIp) {
// this.carServerIp = carServerIp;
// }
//
// public Double getWeightValue() {
// return weightValue;
// }
//
// public void setWeightValue(Double weightValue) {
// this.weightValue = weightValue;
// }
}

View File

@ -0,0 +1,27 @@
package com.car.demos.car;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Date;
/**
* Car
*
* @author Yangle
* Date 2024/6/3 19:36
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class Car {
private Integer id;
private String vin;
private Date timestamp;
private String ip;
}

View File

@ -0,0 +1,18 @@
package com.car.demos.car;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* CarReq
*
* @author Yangle
* Date 2024/6/3 20:28
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CarReq {
private String sorting;
}

View File

@ -34,7 +34,7 @@ public class LoadEnterNumber {
/*
*/
private String overallLoad;
private Double overallLoad;
/*
线
*/

View File

@ -0,0 +1,46 @@
package com.car.demos.loadenter;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* Node
*
* @author Yangle
* Date 2024/6/3 8:44
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class Node {
private Integer id;
/**
*
*/
private String nodeName;
/**
*
*/
private Double memoryUseRate;
/**
* cpu使
*/
private Double cpuUsage;
/**
*
*/
private Double nodeWeights;
}

View File

@ -0,0 +1,21 @@
package com.car.demos.loadenter;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* NodeReq
*
* @author Yangle
* Date 2024/6/4 9:43
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NodeReq {
public String sorting;
}

View File

@ -1,8 +1,12 @@
package com.car.mapper;
import com.car.demos.car.Car;
import com.car.demos.loadenter.LoadEnterNumber;
import com.car.demos.loadenter.Node;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* ConnerMapper
*
@ -15,4 +19,17 @@ public interface ConnerMapper {
void loadCenterDel();
LoadEnterNumber select();
void nodeAdd(Node node);
void carAdd(Car car);
List<Car> selectCar();
void delNode();
List<Node> selectNode( );
}

View File

@ -2,7 +2,12 @@ package com.car.service;
import com.car.demos.MqttServerModel;
import com.car.demos.Result;
import com.car.demos.car.Car;
import com.car.demos.loadenter.LoadEnterNumber;
import com.car.demos.loadenter.Node;
import com.car.demos.req.VehicleConnectionReq;
import java.util.List;
/**
* ConnectImpl
@ -11,10 +16,21 @@ import com.car.demos.loadenter.LoadEnterNumber;
* Date 2024/5/28 21:49
*/
public interface ConnectService {
Result<MqttServerModel> getConnect();
Result<MqttServerModel> getConnect(VehicleConnectionReq vehicleConnectionReq);
void LoadCenterAdd(LoadEnterNumber loadEnterNumber);
void loadCenterDel();
LoadEnterNumber select();
void nodeAdd(Node node);
void carAdd(Car car);
List<Car> selectCar();
void delNode();
List<Node> selectNode( );
}

View File

@ -5,13 +5,19 @@ import com.aliyun.teaopenapi.models.Config;
import com.car.demos.MqttServerModel;
import com.car.demos.Result;
import com.car.demos.car.Car;
import com.car.demos.loadenter.LoadEnterNumber;
import com.car.demos.loadenter.Node;
import com.car.demos.req.VehicleConnectionReq;
import com.car.mapper.ConnerMapper;
import com.car.service.ConnectService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* ConnectImplImpl
*
@ -19,16 +25,16 @@ import org.springframework.stereotype.Service;
* Date 2024/5/28 21:50
*/
@Service
@Log4j2
public class ConnectServiceImpl implements ConnectService {
@Autowired
private ConnerMapper connerMapper;
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public Result<MqttServerModel> getConnect() {
public Result<MqttServerModel> getConnect(VehicleConnectionReq vehicleConnectionReq) {
if (redisTemplate.hasKey("Redis")) {
Integer count = Integer.valueOf(redisTemplate.opsForValue().get("Redis"));
Integer count = Integer.valueOf(redisTemplate.opsForValue().get("ip"));
if (count == 100){
redisTemplate.opsForValue().set("Redis",String.valueOf(0));
}else {
@ -42,8 +48,8 @@ public class ConnectServiceImpl implements ConnectService {
String ip = redisTemplate.opsForList().index("ips", 0);
return Result.success(new MqttServerModel("tcp://"+ip+":1883","test1"));
}
}
}
/**
*
* @param loadEnterNumber
@ -61,6 +67,42 @@ public class ConnectServiceImpl implements ConnectService {
connerMapper.loadCenterDel();
}
@Override
public LoadEnterNumber select() {
return connerMapper.select();
}
@Override
public void nodeAdd(Node node) {
connerMapper.nodeAdd(node);
}
@Override
public void carAdd(Car car) {
connerMapper.carAdd(car);
}
@Override
public List<Car> selectCar( ) {
return connerMapper.selectCar();
}
@Override
public void delNode() {
connerMapper.delNode();
}
/**
*
* @param
* @return
*/
@Override
public List<Node> selectNode( ) {
return connerMapper.selectNode();
}
public static Client createClient() throws Exception {
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。

View File

@ -14,33 +14,9 @@ spring:
matching-strategy: ant_path_matcher
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://115.159.211.196:3306/zncar?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false
url: jdbc:mysql://115.159.211.196:3306/zncar?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
username: root
password: yl@123
druid:
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initial-size: 5
min-idle: 5
max-active: 20
# 配置获取连接等待超时的时间
max-wait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
time-between-eviction-runs-millis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
min-evictable-idle-time-millis: 300000
validation-query: SELECT 1 FROM DUAL
test-while-idle: true
test-on-borrow: false
test-on-return: false
# 打开PSCache并且指定每个连接上PSCache的大小
pool-prepared-statements: true
# 配置监控统计拦截的filters去掉后监控界面sql无法统计'wall'用于防火墙
max-pool-prepared-statement-per-connection-size: 20
filters: stat,wall
use-global-data-source-stat: true
# 通过connectProperties属性来打开mergeSql功能慢SQL记录
connect-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
application:
name: shop-server
redis:

View File

@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.car.mapper.ConnerMapper">
<insert id="lodaCenterAdd">
@ -9,7 +8,32 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
( `gateway_num`, `data_num`, `overall_load`, `car_sum`)
VALUES ( #{gatewayNum}, #{dataNum}, #{overallLoad}, #{carSum});
</insert>
<insert id="nodeAdd">
INSERT INTO `zncar`.`node`
( `node_name`, `memory_use_rate`, `cpu_usage`, `node_weights`)
VALUES
( #{nodeName}, #{memoryUseRate}, #{cpuUsage}, #{nodeWeights});
</insert>
<insert id="carAdd">
INSERT INTO `zncar`.`car` ( `vin`, `timestamp`, `ip`) VALUES ( #{vin}, #{timestamp}, #{ip});
</insert>
<delete id="loadCenterDel">
DELETE FROM `zncar`.`load_enter_number`
</delete>
<delete id="delNode">
DELETE FROM `zncar`.`node`
</delete>
<select id="select" resultType="com.car.demos.loadenter.LoadEnterNumber">
select * from load_enter_number
</select>
<select id="selectCar" resultType="com.car.demos.car.Car">
select * from car
order by timestamp desc
</select>
<select id="selectNode" resultType="com.car.demos.loadenter.Node">
select * from node order by node_weights desc
</select>
</mapper>