commit 77ade6288e7ea423a9d109c09f059a62f8746ef9 Author: Yunfei Du <278774021@qq.com> Date: Fri May 31 11:03:46 2024 +0800 feat:负载中心 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..63574ec --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..8d66637 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..82dbec8 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d095040 --- /dev/null +++ b/pom.xml @@ -0,0 +1,171 @@ + + + 4.0.0 + + com.load + Load_center + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + spring-boot-starter-parent + org.springframework.boot + 2.6.2 + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.apache.commons + commons-lang3 + + + + + + + + org.springframework.boot + spring-boot-starter-data-redis + + + io.lettuce + lettuce-core + + + + + redis.clients + jedis + + + org.redisson + redisson + 3.16.0 + + + + org.apache.httpcomponents + httpcore + 4.4.12 + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + com.alibaba + druid-spring-boot-starter + 1.2.8 + + + + + javax.servlet + javax.servlet-api + provided + + + + mysql + mysql-connector-java + 8.0.11 + runtime + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.2.2 + + + + com.github.pagehelper + pagehelper-spring-boot-starter + 1.4.1 + + + + + org.projectlombok + lombok + true + + + commons-lang + commons-lang + 2.6 + + + + com.alibaba + fastjson + 1.2.79 + + + + + org.springframework.boot + spring-boot-starter-quartz + + + + org.projectlombok + lombok + + + com.fasterxml.jackson.core + jackson-annotations + + + + org.springframework.boot + spring-boot-starter-amqp + + + + com.aliyun + ecs20140526 + 5.1.8 + + + + diff --git a/src/main/java/com/load/LoadApplication.java b/src/main/java/com/load/LoadApplication.java new file mode 100644 index 0000000..b50678f --- /dev/null +++ b/src/main/java/com/load/LoadApplication.java @@ -0,0 +1,19 @@ +package com.load; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * 启动类 + * + * @author YunFei·Du + * @ClassName: LoadApplication + * @Description: 启动类 + * @CreateTime: 2024/5/27 09:50 + */ +@SpringBootApplication +public class LoadApplication { + public static void main(String[] args) { + SpringApplication.run(LoadApplication.class,args); + } +} diff --git a/src/main/java/com/load/MqttApplication.java b/src/main/java/com/load/MqttApplication.java new file mode 100644 index 0000000..cd1def0 --- /dev/null +++ b/src/main/java/com/load/MqttApplication.java @@ -0,0 +1,10 @@ +package com.load; + +/** + * @ClassName MqttApplication + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/5/30 9:19 + */ +public class MqttApplication { +} diff --git a/src/main/java/com/load/SubscribeSample.java b/src/main/java/com/load/SubscribeSample.java new file mode 100644 index 0000000..c58e6c0 --- /dev/null +++ b/src/main/java/com/load/SubscribeSample.java @@ -0,0 +1,61 @@ +package com.load; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +/** + * 订阅 MQTT 主题 + */ +public class SubscribeSample { + public static void main(String[] args) { + /** + * 代理地址 + */ + String broker = "tcp://39.100.87.192:1883"; + /** + * 主题 + */ + String topic = "mqtt/test"; + String username = "emqx"; + String password = "public"; + /** + * 客户端ID(随机) + */ + String clientid = "subscribe_client"; + int qos = 0; + + try { + MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(60); +// 堵塞60S + options.setKeepAliveInterval(60); + // 设置回调 + client.setCallback(new MqttCallback() { + + public void connectionLost(Throwable cause) { + System.out.println("connectionLost: " + cause.getMessage()); + } + + public void messageArrived(String topic, MqttMessage message) { + System.out.println("topic: " + topic); + System.out.println("Qos: " + message.getQos()); + System.out.println("message content: " + new String(message.getPayload())); + + } + + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + + }); + client.connect(options); + client.subscribe(topic, qos); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/load/config/InitConnectWeight.java b/src/main/java/com/load/config/InitConnectWeight.java new file mode 100644 index 0000000..962be25 --- /dev/null +++ b/src/main/java/com/load/config/InitConnectWeight.java @@ -0,0 +1,171 @@ +package com.load.config; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.aliyun.ecs20140526.Client; +import com.aliyun.ecs20140526.models.DescribeInstancesRequest; +import com.aliyun.ecs20140526.models.DescribeInstancesResponse; +import com.aliyun.tea.TeaException; +import com.aliyun.teautil.models.RuntimeOptions; +import com.load.domain.ConnectWeight; +import com.load.util.ECSTool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.http.*; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.*; +import java.util.stream.Collectors; + +/** + * 初始化连接权重 + * + * @author YunFei·Du + * @ClassName: InitConnectWeight + * @Description: 初始化连接权重 + * @CreateTime: 2024/5/27 21:31 + */ +@Component +@Slf4j +public class InitConnectWeight implements ApplicationRunner { + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private RestTemplate restTemplate; + + @Override + public void run(ApplicationArguments args) throws Exception { + + ArrayList connectWeightList = new ArrayList<>(); + + ArrayList ipList = new ArrayList<>(); + + Client client = null; + //获取阿里云客户端 + try { + client = ECSTool.createClient(); + } catch (Exception e) { + throw new RuntimeException(e); + } + //获取所有实例 + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() + .setRegionId("cn-zhangjiakou"); +// .setInstanceType("ecs.e-c1m2.xlarge"); + RuntimeOptions runtime = new RuntimeOptions(); + try { + // 复制代码运行请自行打印 API 的返回值 + DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime); + List> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList()); + for (List strings : ipListList) { + for (String ip : strings) { + ipList.add(ip); + } + } + } catch (TeaException error) { + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } catch (Exception _error) { + TeaException error = new TeaException(_error.getMessage(), _error); + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } + + + //http://fluxmq.muyu.icu/public/login + + //遍历所有ip,获取每一个服务的连接数 + for (String ip : ipList) { + String url = "http://" + ip + ":8080/public/login"; + Map request = new HashMap<>(); + request.put("username", "fluxmq"); + request.put("password", "fluxmq"); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.APPLICATION_JSON); + HttpEntity> r = new HttpEntity>(request, httpHeaders); + String result = restTemplate.postForObject(url, r, String.class); + + + //http://fluxmq.muyu.icu/public/cluster + + int nextInt = new Random().nextInt(1000); + String getInfoUrl = "http://" + ip + ":8080/public/cluster?random=" + nextInt; + + HttpHeaders httpHeadersGetInfo = new HttpHeaders(); + httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON); + httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); + httpHeadersGetInfo.set("Cookie", result); + HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo); + ResponseEntity responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1); + + log.error("响应是:{}", responseInfo.getBody()); + + JSONArray jsonArray = JSON.parseArray(responseInfo.getBody()); + if (jsonArray.size() > 0) { + JSONObject jsonObject = jsonArray.getJSONObject(0); + Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize")); + connectWeightList.add(new ConnectWeight(ip,100-connectSize)); + log.error("链接数量:{}", connectSize); + } else { + log.error("得到的相应数据为null"); + } + } + // 初始化权重求和变量 + Integer sum =0; + // 遍历ConnectWeight列表,计算所有权重值的总和 + for (ConnectWeight connectWeight : connectWeightList) { + sum = sum + connectWeight.getWeightValue(); + } + + // 初始化最大轮询次数为0 + int max = 0; + for (ConnectWeight connectWeight : connectWeightList) { + log.error("权重值:{}",connectWeight.getWeightValue()); + // 将权重值转换为百分比并向下取整,得到每一轮的轮询次数 + Integer result = BigDecimal.valueOf(connectWeight.getWeightValue() * 100).divide(BigDecimal.valueOf(sum), 0, RoundingMode.DOWN).intValue(); + // 如果计算出的轮询次数大于当前最大值,更新最大值 + if (result > max){ + max = result; + } + // 更新ConnectWeight对象的权重值为计算出的轮询次数 + connectWeight.setWeightValue(result); + // 输出100次轮询次数 + log.error("100次轮询次数:{}",result); + } + + ArrayList weightIpList = new ArrayList<>(); + + //轮询出现次数 + for (int i = 0; i <= max; i++) { + for (ConnectWeight connectWeight : connectWeightList) { + if (connectWeight.getWeightValue() > i){ + weightIpList.add(connectWeight.getCarServerIp()); + }else if (connectWeight.getWeightValue() == max){ + weightIpList.add(connectWeight.getCarServerIp()); + } + } + } + + //存入redis + redisTemplate.delete("ipList"); + for (String ip : weightIpList) { + redisTemplate.opsForList().rightPush("ipList",ip); + } + } +} diff --git a/src/main/java/com/load/config/RabbitmqConfig.java b/src/main/java/com/load/config/RabbitmqConfig.java new file mode 100644 index 0000000..97e27f2 --- /dev/null +++ b/src/main/java/com/load/config/RabbitmqConfig.java @@ -0,0 +1,54 @@ +package com.load.config; + +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * rabbitMq配置类 + * + * @author YunFei·Du + * @ClassName: RabbitmqConfig + * @Description: rabbitMq配置类 + * @CreateTime: 2024/5/27 16:56 + */ +@Configuration +public class RabbitmqConfig { + public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; + public static final String QUEUE_INFORM_SMS = "disconnect_connect"; + public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; + public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; + public static final String ROUTINGKEY_SMS="inform.#.sms.#"; + + + @Bean(EXCHANGE_TOPICS_INFORM) + public Exchange EXCHANGE_TOPICS_INFORM(){ + //durable(true) 持久化,mq重启之后交换机还在 + return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); + } + + //声明QUEUE_INFORM_EMAIL队列 + @Bean(QUEUE_INFORM_EMAIL) + public Queue QUEUE_INFORM_EMAIL(){ + return new Queue(QUEUE_INFORM_EMAIL); + } + //声明QUEUE_INFORM_SMS队列 + @Bean(QUEUE_INFORM_SMS) + public Queue QUEUE_INFORM_SMS(){ + return new Queue(QUEUE_INFORM_SMS); + } + + //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey + @Bean + public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); + } + //ROUTINGKEY_SMS队列绑定交换机,指定routingKey + @Bean + public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); + } +} diff --git a/src/main/java/com/load/config/RedisConfig.java b/src/main/java/com/load/config/RedisConfig.java new file mode 100644 index 0000000..c18c5e7 --- /dev/null +++ b/src/main/java/com/load/config/RedisConfig.java @@ -0,0 +1,33 @@ +package com.load.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +/** + * redsi配置类 + * + * @author YunFei·Du + * @ClassName: RedisConfoig + * @Description: redsi配置类 + * @CreateTime: 2024/5/27 14:16 + */ +@Configuration +public class RedisConfig { + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){ + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(redisConnectionFactory); + + redisTemplate.setKeySerializer(new StringRedisSerializer()); + redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer(Object.class)); + + redisTemplate.setHashKeySerializer(new StringRedisSerializer()); + redisTemplate.setHashValueSerializer(new StringRedisSerializer()); + return redisTemplate; + + } +} diff --git a/src/main/java/com/load/config/RestTemplateConfig.java b/src/main/java/com/load/config/RestTemplateConfig.java new file mode 100644 index 0000000..b06b8c0 --- /dev/null +++ b/src/main/java/com/load/config/RestTemplateConfig.java @@ -0,0 +1,33 @@ +package com.load.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.ClientHttpRequestFactory; +import org.springframework.http.client.SimpleClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +/** + * rest配置类 + * + * @author YunFei·Du + * @ClassName: RestTemplateConfig + * @Description: rest配置类 + * @CreateTime: 2024/5/27 10:01 + */ +@Configuration +public class RestTemplateConfig { + @Bean + public RestTemplate restTemplate(ClientHttpRequestFactory factory) { + return new RestTemplate(factory); + } + + @Bean + public ClientHttpRequestFactory simpleClientHttpRequestFactory() { + SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); + //超时设置 + factory.setReadTimeout(5000);//ms + factory.setConnectTimeout(15000);//ms + return factory; + } +} + diff --git a/src/main/java/com/load/consumer/ReceiveHandler.java b/src/main/java/com/load/consumer/ReceiveHandler.java new file mode 100644 index 0000000..91ef4ce --- /dev/null +++ b/src/main/java/com/load/consumer/ReceiveHandler.java @@ -0,0 +1,42 @@ +package com.load.consumer; + +import com.load.config.InitConnectWeight; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.DefaultApplicationArguments; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + + +/** + * 断开连接消费者 + * + * @author YunFei·Du + * @ClassName: ReceiveHandler + * @Description: 断开连接消费者 + * @CreateTime: 2024/5/27 17:00 + */ +@Component +@Slf4j +public class ReceiveHandler { + @Autowired + private RedisTemplate redisTemplate; + @Autowired + private InitConnectWeight initConnectWeight; + + //监听sms队列 ADD_LOG_AAA + @RabbitListener(queues = {"ADD_LOG_AAA"}) + public void receiveSms(Message message) { + try { + initConnectWeight.run(new DefaultApplicationArguments()); + redisTemplate.delete("count"); + log.warn("ip权重重新分配成功!"); + } catch (Exception e) { + throw new RuntimeException(e); + } + + System.out.println("消费者得到的消息: " + new String(message.getBody())); + } +} diff --git a/src/main/java/com/load/controller/LoadBalanceController.java b/src/main/java/com/load/controller/LoadBalanceController.java new file mode 100644 index 0000000..f7e6e99 --- /dev/null +++ b/src/main/java/com/load/controller/LoadBalanceController.java @@ -0,0 +1,47 @@ +package com.load.controller; + +import com.load.domain.MqttServerModel; +import com.load.domain.Result; +import com.load.domain.req.VehicleConnectionReq; +import com.load.service.LoadBalanceService; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + + +/** + * 车辆连接 + * + * @author YunFei·Du + * @ClassName: ConnectController + * @Description: 车辆连接 + * @CreateTime: 2024/5/27 09:00 + */ +@RestController +@RequestMapping("loadBalance") +@Log4j2 +public class LoadBalanceController { + + + @Autowired + private LoadBalanceService loadBalanceService; + + + /** + * 获取连接 + */ + @PostMapping("/getConnect") + public Result getConnect(@RequestBody VehicleConnectionReq vehicleConnectionReq){ + return loadBalanceService.getConnect(vehicleConnectionReq); + } + + /** + * 创建实例 + */ + @GetMapping("/createConnect") + public void createConnect() { + loadBalanceService.createConnect ( ); + log.info ( "创建实例成功" ); + } + +} diff --git a/src/main/java/com/load/domain/ConnectUseInfo.java b/src/main/java/com/load/domain/ConnectUseInfo.java new file mode 100644 index 0000000..6afffc5 --- /dev/null +++ b/src/main/java/com/load/domain/ConnectUseInfo.java @@ -0,0 +1,32 @@ +package com.load.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 内存使用情况 + * + * @author YunFei·Du + * @ClassName: MemoryUserInfo + * @Description: 内存使用情况 + * @CreateTime: 2024/5/27 15:53 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ConnectUseInfo { + + /** + * 节点ID + */ + private String clusterId; + /** + * 所属IP + */ + private String ipAddress; + /** + * 剩余可连接数量 + */ + private Integer remainingNum; +} diff --git a/src/main/java/com/load/domain/ConnectWeight.java b/src/main/java/com/load/domain/ConnectWeight.java new file mode 100644 index 0000000..35ffcda --- /dev/null +++ b/src/main/java/com/load/domain/ConnectWeight.java @@ -0,0 +1,28 @@ +package com.load.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 连接权重 + * + * @author YunFei·Du + * @ClassName: CarServer + * @Description: 车子服务器 + * @CreateTime: 2024/5/27 09:04 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ConnectWeight { + + /** + * 服务器ip + */ + private String carServerIp; + /** + * 权重值 + */ + private Integer weightValue; +} diff --git a/src/main/java/com/load/domain/HttpStatus.java b/src/main/java/com/load/domain/HttpStatus.java new file mode 100644 index 0000000..22de578 --- /dev/null +++ b/src/main/java/com/load/domain/HttpStatus.java @@ -0,0 +1,93 @@ +package com.load.domain; + +/** + * 返回状态码 + * + * @author ruoyi + */ +public class HttpStatus { + /** + * 操作成功 + */ + public static final int SUCCESS = 200; + + /** + * 对象创建成功 + */ + public static final int CREATED = 201; + + /** + * 请求已经被接受 + */ + public static final int ACCEPTED = 202; + + /** + * 操作已经执行成功,但是没有返回数据 + */ + public static final int NO_CONTENT = 204; + + /** + * 资源已被移除 + */ + public static final int MOVED_PERM = 301; + + /** + * 重定向 + */ + public static final int SEE_OTHER = 303; + + /** + * 资源没有被修改 + */ + public static final int NOT_MODIFIED = 304; + + /** + * 参数列表错误(缺少,格式不匹配) + */ + public static final int BAD_REQUEST = 400; + + /** + * 未授权 + */ + public static final int UNAUTHORIZED = 401; + + /** + * 访问受限,授权过期 + */ + public static final int FORBIDDEN = 403; + + /** + * 资源,服务未找到 + */ + public static final int NOT_FOUND = 404; + + /** + * 不允许的http方法 + */ + public static final int BAD_METHOD = 405; + + /** + * 资源冲突,或者资源被锁 + */ + public static final int CONFLICT = 409; + + /** + * 不支持的数据,媒体类型 + */ + public static final int UNSUPPORTED_TYPE = 415; + + /** + * 系统内部错误 + */ + public static final int ERROR = 500; + + /** + * 接口未实现 + */ + public static final int NOT_IMPLEMENTED = 501; + + /** + * 系统警告消息 + */ + public static final int WARN = 601; +} diff --git a/src/main/java/com/load/domain/MqttServerModel.java b/src/main/java/com/load/domain/MqttServerModel.java new file mode 100644 index 0000000..b88f78a --- /dev/null +++ b/src/main/java/com/load/domain/MqttServerModel.java @@ -0,0 +1,28 @@ +package com.load.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Mqtt服务器模型 + * @author YunFei.Du + * @date 22:08 2024/5/29 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttServerModel { + + /** + * MQTT服务节点 + */ + private String broker; + + /** + * MQTT订阅主题 + */ + private String topic; +} diff --git a/src/main/java/com/load/domain/Result.java b/src/main/java/com/load/domain/Result.java new file mode 100644 index 0000000..8c7d46c --- /dev/null +++ b/src/main/java/com/load/domain/Result.java @@ -0,0 +1,101 @@ +package com.load.domain; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 响应信息主体 + * + * @author ruoyi + */ +@Data +public class Result implements Serializable { + /** + * 成功 + */ + public static final int SUCCESS = HttpStatus.SUCCESS; + /** + * 失败 + */ + public static final int FAIL = HttpStatus.ERROR; + private static final long serialVersionUID = 1L; + /** + * 系统警告消息 + */ + private static final int WARN = HttpStatus.WARN; + + private int code; + + private String msg; + + private T data; + + public static Result success () { + return restResult(null, SUCCESS, "操作成功"); + } + + public static Result success (T data) { + return restResult(data, SUCCESS, "操作成功"); + } + + public static Result success (T data, String msg) { + return restResult(data, SUCCESS, msg); + } + + public static Result error () { + return restResult(null, FAIL, "操作失败"); + } + + public static Result error (String msg) { + return restResult(null, FAIL, msg); + } + + public static Result error (T data) { + return restResult(data, FAIL, "操作失败"); + } + + public static Result error (T data, String msg) { + return restResult(data, FAIL, msg); + } + + public static Result error (int code, String msg) { + return restResult(null, code, msg); + } + + public static Result warn () { + return restResult(null, WARN, "操作失败"); + } + + public static Result warn (String msg) { + return restResult(null, WARN, msg); + } + + public static Result warn (T data) { + return restResult(data, WARN, "操作失败"); + } + + public static Result warn (T data, String msg) { + return restResult(data, WARN, msg); + } + + public static Result warn (int code, String msg) { + return restResult(null, code, msg); + } + + private static Result restResult (T data, int code, String msg) { + Result apiResult = new Result<>(); + apiResult.setCode(code); + apiResult.setData(data); + apiResult.setMsg(msg); + return apiResult; + } + + public static Boolean isError (Result ret) { + return !isSuccess(ret); + } + + public static Boolean isSuccess (Result ret) { + return Result.SUCCESS == ret.getCode(); + } +} diff --git a/src/main/java/com/load/domain/VinIp.java b/src/main/java/com/load/domain/VinIp.java new file mode 100644 index 0000000..aecebe6 --- /dev/null +++ b/src/main/java/com/load/domain/VinIp.java @@ -0,0 +1,19 @@ +package com.load.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ClassName VinIp + * @Description 描述 + * @Author YunFei.Du + * @Date 2024/5/30 20:34 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class VinIp { + String vin; + String ip; +} diff --git a/src/main/java/com/load/domain/req/VehicleConnectionReq.java b/src/main/java/com/load/domain/req/VehicleConnectionReq.java new file mode 100644 index 0000000..f6139ad --- /dev/null +++ b/src/main/java/com/load/domain/req/VehicleConnectionReq.java @@ -0,0 +1,51 @@ +package com.load.domain.req; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author DongZl + * @description: 车辆获取连接地址 + * @Date 2023-11-28 上午 10:32 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class VehicleConnectionReq { + + /** + * { + * "vehicleVin": "VIN1234567894", + * "timestamp": "11111", + * "username": "你好", + * "nonce": "33" + * } + */ + + /** + * vin + */ + @JSONField(name = "vehicleVin") + private String vehicleVin; + + /** + * 时间戳 + */ + private String timestamp; + + /** + * 用户名 + */ + @JSONField(name = "username") + private String userName; + + /** + * 随机字符串 + */ + private String nonce; + +} diff --git a/src/main/java/com/load/mqtt/MessageCallbackService.java b/src/main/java/com/load/mqtt/MessageCallbackService.java new file mode 100644 index 0000000..cf77090 --- /dev/null +++ b/src/main/java/com/load/mqtt/MessageCallbackService.java @@ -0,0 +1,32 @@ +package com.load.mqtt; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +/** + * 消息回调服务 (回执消息类) + * @author YunFei.Du + * @date 22:37 2024/5/30 + */ +@Service +public class MessageCallbackService implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + System.out.println("connectionLost: " + cause.getMessage()); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + System.out.println("topic: " + topic); + System.out.println("Qos: " + message.getQos()); + System.out.println("message content: " + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} diff --git a/src/main/java/com/load/mqtt/MqttFactory.java b/src/main/java/com/load/mqtt/MqttFactory.java new file mode 100644 index 0000000..86692d7 --- /dev/null +++ b/src/main/java/com/load/mqtt/MqttFactory.java @@ -0,0 +1,46 @@ +package com.load.mqtt; + +import lombok.AllArgsConstructor; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +/** + * Mqtt工厂 + * @author YunFei.Du + * @date 22:38 2024/5/30 + */ +@Service +@AllArgsConstructor +public class MqttFactory { + + private final MessageCallbackService messageCallbackService; + // 连接参数 + + /** + * 创建Mqtt 客户端 + * @param mqttProperties + * @return + */ + public MqttClient createClient(MqttProperties mqttProperties){ + MqttClient mqttClient =null; + try { + mqttClient=new MqttClient ( mqttProperties.getBroker() , mqttProperties.getClientId() , new MemoryPersistence() ); + } catch (MqttException e) { + throw new RuntimeException ( e ); + } + MqttConnectOptions options = new MqttConnectOptions ( ); + + if (mqttProperties.isLogin()){ + options.setUserName ( mqttProperties.getUsername() ); + options.setPassword ( mqttProperties.getPassword().toCharArray() ); + } + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + mqttClient.setCallback ( messageCallbackService ); + + return mqttClient; + } +} diff --git a/src/main/java/com/load/mqtt/MqttProperties.java b/src/main/java/com/load/mqtt/MqttProperties.java new file mode 100644 index 0000000..ee5376c --- /dev/null +++ b/src/main/java/com/load/mqtt/MqttProperties.java @@ -0,0 +1,51 @@ +package com.load.mqtt; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +/** + * 配置中心 + * @author YunFei.Du + * @date 8:53 2024/5/30 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class MqttProperties { + + private String broker; + private String topic; + private String username; + private String password; + private String clientId; + + /** + * 构建mqtt配置 + * @param ip + * @param topic + * @return + */ + public static MqttProperties configBuild(String ip, String topic){ + return MqttProperties.builder() + .broker("tcp://"+ip+":1883") + .topic(topic) + .username("admin") + .password("public") + .clientId("protocol-parsing") //协议解析 定值 --> 配置 + .build(); + } + + /** + * 判断是否可以登录 + * @return + */ + public boolean isLogin(){ +// commons-lang3 + return StringUtils.isBlank ( username ) && !StringUtils.isBlank ( password ); + } + +} diff --git a/src/main/java/com/load/rebbitmq/MsgHandle.java b/src/main/java/com/load/rebbitmq/MsgHandle.java new file mode 100644 index 0000000..d48999d --- /dev/null +++ b/src/main/java/com/load/rebbitmq/MsgHandle.java @@ -0,0 +1,28 @@ +package com.load.rebbitmq; + +import com.load.mqtt.MqttFactory; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 信息处理器 + * @author YunFei.Du + * @date 9:21 2024/5/30 + */ +@Component +@Log4j2 +public class MsgHandle { + + @Autowired + private MqttFactory mqttFactory; +// @RabbitListener(queues = {"create.topic"}) +// private void msg(String msg){ +// log.info ( "接收到消息:{}" , msg ); +// MqttProperties mqttProperties = MqttProperties.configBuild ( "39.100.87.192", "mqtt/test" ); +// mqttFactory.createClient ( mqttProperties ); +// +// } + + +} diff --git a/src/main/java/com/load/rebbitmq/RabbitConfig.java b/src/main/java/com/load/rebbitmq/RabbitConfig.java new file mode 100644 index 0000000..32b5beb --- /dev/null +++ b/src/main/java/com/load/rebbitmq/RabbitConfig.java @@ -0,0 +1,36 @@ +package com.load.rebbitmq; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + + +/** + * rabbit 配置类 + * @author YunFei.Du + * @date 8:39 2024/5/31 + */ +@Configuration +public class RabbitConfig { + @Bean + public Queue initQueue() { + return new Queue ("create.totic", true); + } + + @Bean + public DirectExchange direct(){ + return new DirectExchange ("topic.direct"); + } + + @Bean + public Binding bindingla(DirectExchange direct, Queue initQueue){ + return BindingBuilder.bind(initQueue) + .to(direct) + .with("protocol-parsing"); + } + +} diff --git a/src/main/java/com/load/service/LoadBalanceService.java b/src/main/java/com/load/service/LoadBalanceService.java new file mode 100644 index 0000000..0615b27 --- /dev/null +++ b/src/main/java/com/load/service/LoadBalanceService.java @@ -0,0 +1,27 @@ +package com.load.service; +/** + * 连接服务接口 + * + * @author YunFei·Du + * @ClassName: ConnectServer + * @Description: 连接服务接口 + * @CreateTime: 2024/5/27 09:22 + */ + +import com.load.domain.MqttServerModel; +import com.load.domain.Result; +import com.load.domain.req.VehicleConnectionReq; + +/** + *@ClassName ConnectServer + *@Description 描述 + *@Author YunFei·Du + *@Date 2024/5/27 09:22 + */ +public interface LoadBalanceService { + Result getConnect(VehicleConnectionReq vehicleConnectionReq); + + void createConnect(); + + +} diff --git a/src/main/java/com/load/service/impl/LoadBalanceServiceImpl.java b/src/main/java/com/load/service/impl/LoadBalanceServiceImpl.java new file mode 100644 index 0000000..2e3c999 --- /dev/null +++ b/src/main/java/com/load/service/impl/LoadBalanceServiceImpl.java @@ -0,0 +1,81 @@ +package com.load.service.impl; + +import com.load.domain.MqttServerModel; +import com.load.domain.Result; +import com.load.domain.VinIp; +import com.load.domain.req.VehicleConnectionReq; +import com.load.service.LoadBalanceService; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import static com.load.util.ECSTool.runEcsInstance; + +/** + * 服务实现类 + * + * @author YunFei·Du + * @ClassName: ConnectServiceImpl + * @Description: 服务实现类 + * @CreateTime: 2024/5/27 09:22 + */ +@Service +@Slf4j +public class LoadBalanceServiceImpl implements LoadBalanceService { + + + @Autowired + private RedisTemplate redisTemplate; + + + @Override + public Result getConnect(VehicleConnectionReq vehicleConnectionReq) { + + + log.error(vehicleConnectionReq.toString()); + + + + //判断是否有游标key --count + if (redisTemplate.hasKey("count")){ + + Integer count = Integer.valueOf(redisTemplate.opsForValue().get("count")); + if (count == 100){ + redisTemplate.opsForValue().set("count",String.valueOf(0)); + }else { + redisTemplate.opsForValue().set("count",String.valueOf(count + 1)); + } + + //根据游标count获取服务IP + String ip = redisTemplate.opsForList().index("ipList", count); + //关联车辆和服务 + this.insertVinIp(new VinIp (vehicleConnectionReq.getVehicleVin(),ip)); + //响应信息 + return Result.success(new MqttServerModel("tcp://"+ip+":1883","test1")); + }else { + redisTemplate.opsForValue().set("count",String.valueOf(1)); + //根据游标count获取服务IP + String ip = redisTemplate.opsForList().index("ipList", 0); + //关联车辆和服务 + this.insertVinIp(new VinIp(vehicleConnectionReq.getVehicleVin(),ip)); + //响应信息 + return Result.success(new MqttServerModel("tcp://"+ip+":1883","test1")); + } + } + + private void insertVinIp(VinIp vinIp) { + + } + + @SneakyThrows + @Override + public void createConnect() { + runEcsInstance("cn-zhangjiakou", "lt-8vbepqjihmawbkqcwkcm"); + } + + + + +} diff --git a/src/main/java/com/load/util/ECSTool.java b/src/main/java/com/load/util/ECSTool.java new file mode 100644 index 0000000..45233c4 --- /dev/null +++ b/src/main/java/com/load/util/ECSTool.java @@ -0,0 +1,150 @@ +package com.load.util; + +import com.aliyun.ecs20140526.Client; +import com.aliyun.ecs20140526.models.*; +import com.aliyun.tea.TeaException; +import com.aliyun.teaopenapi.models.Config; +import com.aliyun.teautil.models.RuntimeOptions; +import lombok.extern.log4j.Log4j2; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + + * ECS实例工具类 + * @author YunFei.Du + * @date 9:30 2024/5/28 + */ +@Log4j2 +public class ECSTool { + + + /** + * 创建ECS客户端 + * @return ECS客户端实例 + * @throws Exception 如果配置信息不正确或网络问题,将抛出异常 + * 注意:此方法用于演示,实际使用时请替换为安全的鉴权方式,如STS + * 更多鉴权访问方式参考:https://help.aliyun.com/document_detail/378657.html + */ + public static Client createClient() throws Exception { + // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 + Config config = new Config() + .setAccessKeyId("LTAI5tPTk3MFkmCGBbnQgmrM") + .setAccessKeySecret("q7rLjxrI0SLBXlvNT4VmYcHCNCY2p6"); + // Endpoint 请参考 https://api.aliyun.com/product/Ecs + config.endpoint = "ecs.cn-zhangjiakou.aliyuncs.com"; + return new Client(config); + } + + /** + * 创建并运行ECS实例 + * @param regionId 地域ID + * @param launchTemplateId 启动模板ID + * @throws Exception 如果调用API时发生错误,将抛出异常 + */ + public static void runEcsInstance(String regionId, String launchTemplateId) throws Exception { + // 创建ECS客户端 + Client client = ECSTool.createClient(); + // 设置运行实例的请求参数 + RunInstancesRequest runInstancesRequest = new RunInstancesRequest() + .setRegionId(regionId) + .setLaunchTemplateId(launchTemplateId); + // 创建运行选项 + RuntimeOptions runtime = new RuntimeOptions(); + + try { + // 调用API运行实例 + client.runInstancesWithOptions(runInstancesRequest, runtime); + } catch (Exception error) { + // 处理API调用过程中出现的异常 + System.out.println(error.getMessage()); + if (error instanceof TeaException) { + // 处理特定类型的异常,如TeaException + TeaException teaError = (TeaException) error; + System.out.println(teaError.getData().get("Recommend")); // 打印诊断推荐链接 + com.aliyun.teautil.Common.assertAsString(teaError.getMessage()); // 断言错误信息 + } else { + // 处理其他类型的异常 + System.out.println(error.getMessage()); + } + } + } + + /** + * 销毁实例 + */ + public static void runEcsRemove(String instanceId) throws Exception { + // 创建ECS客户端 + Client client = ECSTool.createClient(); + DeleteInstanceRequest deleteInstanceRequest = new DeleteInstanceRequest () + .setInstanceId(instanceId); + RuntimeOptions runtime = new RuntimeOptions(); + try { + // 复制代码运行请自行打印 API 的返回值 + client.deleteInstanceWithOptions(deleteInstanceRequest, runtime); + } catch (TeaException error) { + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } catch (Exception _error) { + TeaException error = new TeaException(_error.getMessage(), _error); + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } + } + + /** + * 查询实例列表 + * @param regionId + * @return + * @throws Exception + */ + public static List FindInstance( String regionId) throws Exception { + // 创建ECS客户端 + Client client = ECSTool.createClient ( ); + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest ( ) + .setRegionId ( regionId ); + RuntimeOptions runtime = new RuntimeOptions ( ); + List< String > ipList = new ArrayList<> ( ); + try { + // 复制代码运行请自行打印 API 的返回值 + DescribeInstancesResponse response = client.describeInstancesWithOptions ( describeInstancesRequest, runtime ); + + List< List< String > > ipListList = response.getBody ( ).instances.getInstance ( ).stream ( ).map ( instance -> instance.publicIpAddress.ipAddress ).collect ( Collectors.toList ( ) ); + for (List< String > strings : ipListList) { + for (String ip : strings) { + ipList.add ( ip ); + } + System.out.println ( "------------------------" ); + } + log.info ( "ipList: " + ipList ); // [39.100.89.218, 39.100.87.192] + return ipList; + } catch (TeaException error) { + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println ( error.getMessage ( ) ); + // 诊断地址 + System.out.println ( error.getData ( ).get ( "Recommend" ) ); + com.aliyun.teautil.Common.assertAsString ( error.message ); + } catch (Exception _error) { + TeaException error = new TeaException ( _error.getMessage ( ), _error ); + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println ( error.getMessage ( ) ); + // 诊断地址 + System.out.println ( error.getData ( ).get ( "Recommend" ) ); + com.aliyun.teautil.Common.assertAsString ( error.message ); + } + return null; + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..0d1238b --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,24 @@ +# 服务器相关 + +server: + port: 82 + +spring: + application: +# 协议解析 + name: protocol-parsing + rabbitmq: + host: 111.229.102.61 + port: 5672 + username: guest + password: guest + virtualHost: / + redis: + host: 127.0.0.1 + port: 6379 + password: dyf@123 + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://111.229.102.61/car?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false + username: root + password: dyf@123