commit 2f90c610b5890ec585076591f4ced5dcc5e1761f Author: rouchen <3133657697@qq.com> Date: Thu May 30 21:51:57 2024 +0800 feat mqtt监听 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c932398 --- /dev/null +++ b/pom.xml @@ -0,0 +1,193 @@ + + + 4.0.0 + com.car + zn-car + 0.0.1-SNAPSHOT + zn-car + Demo project for Spring Boot + + 17 + UTF-8 + UTF-8 + 2.6.13 + + + + + spring-boot-starter-parent + org.springframework.boot + 2.6.2 + + + + + + org.springframework.boot + spring-boot-starter-data-redis + + + io.lettuce + lettuce-core + + + + + redis.clients + jedis + + + org.redisson + redisson + 3.16.0 + + + + org.apache.httpcomponents + httpcore + 4.4.12 + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + com.alibaba + druid-spring-boot-starter + 1.2.8 + + + + + javax.servlet + javax.servlet-api + provided + + + + mysql + mysql-connector-java + 8.0.11 + runtime + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.2.2 + + + + com.github.pagehelper + pagehelper-spring-boot-starter + 1.4.1 + + + + + org.projectlombok + lombok + true + + + commons-lang + commons-lang + 2.6 + + + + com.alibaba + fastjson + 1.2.79 + + + + + org.springframework.boot + spring-boot-starter-quartz + + + + com.fasterxml.jackson.core + jackson-annotations + + + + com.aliyun + ecs20140526 + 5.1.8 + + + org.springframework.boot + spring-boot-starter-amqp + 2.3.7.RELEASE + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + UTF-8 + + + + org.springframework.boot + spring-boot-maven-plugin + 2.3.7.RELEASE + + com.bwie.BootStudentApplication + + + + repackage + + repackage + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/car/ZnCarApplication.java b/src/main/java/com/car/ZnCarApplication.java new file mode 100644 index 0000000..27c3eec --- /dev/null +++ b/src/main/java/com/car/ZnCarApplication.java @@ -0,0 +1,13 @@ +package com.car; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ZnCarApplication { + + public static void main(String[] args) { + SpringApplication.run(ZnCarApplication.class, args); + } + +} diff --git a/src/main/java/com/car/config/InitConnectWeight.java b/src/main/java/com/car/config/InitConnectWeight.java new file mode 100644 index 0000000..bcdc01b --- /dev/null +++ b/src/main/java/com/car/config/InitConnectWeight.java @@ -0,0 +1,209 @@ +package com.car.config; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.aliyun.ecs20140526.Client; +import com.aliyun.ecs20140526.models.DescribeInstancesRequest; +import com.aliyun.ecs20140526.models.DescribeInstancesResponse; +import com.aliyun.tea.TeaException; +import com.aliyun.teautil.models.RuntimeOptions; +import com.car.demos.ConnectWeight; +import com.car.service.impl.ConnectServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.http.*; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.*; +import java.util.stream.Collectors; + +/** + * 初始化连接权重 InitConnectWeight + * + * @author Yangle + * Date 2024/5/28 21:41 + */ +@Component +@Slf4j +public class InitConnectWeight implements ApplicationRunner { + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private RestTemplate restTemplate; + @Autowired + private ConnectServiceImpl connectService; + + @Override + public void run(ApplicationArguments args) { + + ArrayList connectWeightList = new ArrayList<>(); + + ArrayList ipList = new ArrayList<>(); + + Client client = null; + //获取阿里云客户端 + try { + client = ConnectServiceImpl.createClient(); + } catch (Exception e) { + throw new RuntimeException(e); + } + //获取所有实例 + DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest() + .setRegionId("cn-shanghai") + .setStatus("Running");; + RuntimeOptions runtime = new RuntimeOptions(); + try { + // 复制代码运行请自行打印 API 的返回值 + DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(describeInstancesRequest, runtime); + + // 获取实例ip + List> ipListList = describeInstancesResponse.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList()); + for (List strings : ipListList) { + for (String ip : strings) { + ipList.add(ip); + } + System.out.println("------------------------"); + } + + } catch (TeaException error) { + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } catch (Exception _error) { + TeaException error = new TeaException(_error.getMessage(), _error); + // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 + // 错误 message + System.out.println(error.getMessage()); + // 诊断地址 + System.out.println(error.getData().get("Recommend")); + com.aliyun.teautil.Common.assertAsString(error.message); + } + + + //遍历所有ip,获取每一个服务的连接数 + for (String ip : ipList) { + //fluxMq连接 + String url = "http://" + ip + ":8080/public/login"; + Map request = new HashMap<>(); + request.put("username", "fluxmq"); + request.put("password", "fluxmq"); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.APPLICATION_JSON); + HttpEntity> r = new HttpEntity>(request, httpHeaders); + String result = restTemplate.postForObject(url, r, String.class); + + + //http://fluxmq.muyu.icu/public/cluster + //获取FluxMq运行时详情信息 + int nextInt = new Random().nextInt(1000); + String getInfoUrl = "http://" + ip + ":8080/public/cluster?random=" + nextInt; + + HttpHeaders httpHeadersGetInfo = new HttpHeaders(); + httpHeadersGetInfo.setContentType(MediaType.APPLICATION_JSON); + httpHeadersGetInfo.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); + httpHeadersGetInfo.set("Cookie", result); + HttpEntity getInfoRequest = new HttpEntity(httpHeadersGetInfo); + ResponseEntity responseInfo = restTemplate.exchange(getInfoUrl, HttpMethod.GET, getInfoRequest, String.class, 1); + + log.info("响应是:{}", responseInfo.getBody()); + + JSONArray jsonArray = JSON.parseArray(responseInfo.getBody()); + if (jsonArray.size() > 0) { + JSONObject jsonObject = jsonArray.getJSONObject(0); + Integer connectSize = Integer.valueOf(jsonObject.getJSONObject("mqttInfo").getString("connectSize")); + connectWeightList.add(new ConnectWeight(ip,100-connectSize)); + log.info("链接数量:{}", connectSize); + } else { + log.info("得到的相应数据为null"); + } + } +// Integer sum =0; +// for (ConnectWeight connectWeight : connectWeightList) { +// sum = sum + connectWeight.getWeightValue(); +// } +// +// int max = 0; +// for (ConnectWeight connectWeight : connectWeightList) { +// log.info("权重值:{}",connectWeight.getWeightValue()); +// Integer result = BigDecimal.valueOf(connectWeight.getWeightValue() * 100).divide(BigDecimal.valueOf(sum), 0, RoundingMode.DOWN).intValue(); +// if (result > max){ +// max = result; +// } +// connectWeight.setWeightValue(result); +// log.info("100次轮询次数:{}",result); +// } +// +// ArrayList weightIpList = new ArrayList<>(); +// +// //轮询出现次数 +// for (int i = 0; i <= max; i++) { +// for (ConnectWeight connectWeight : connectWeightList) { +// if (connectWeight.getWeightValue() > i){ +// weightIpList.add(connectWeight.getCarServerIp()); +// }else if (connectWeight.getWeightValue() == max ){ +// weightIpList.add(connectWeight.getCarServerIp()); +// } +// } +// } +// +// //存入redis +// redisTemplate.delete("ips"); +// for (String ip : weightIpList) { +// redisTemplate.opsForList().rightPush("ips",ip); +// } + + + //每个连接数的权重总和 + Integer sum =0; + for (ConnectWeight connectWeight : connectWeightList) { + sum =sum+connectWeight.getWeightValue(); + } + System.out.println("sum"+sum); + + int max=0; + for (ConnectWeight connectWeight : connectWeightList) { + Integer result = BigDecimal.valueOf(connectWeight.getWeightValue() * 100).divide(BigDecimal.valueOf(sum), 0, RoundingMode.DOWN).intValue(); + if (result > max){ + max = result; + } + connectWeight.setWeightValue(result); + System.out.println("100次轮询次数:{}"+result); + } + + ArrayList weightIpList = new ArrayList<>(); + + for (int i = 0; i <= max; i++) { + for (ConnectWeight connectWeight : connectWeightList) { + log.info("权重值:{}",connectWeight.getWeightValue()); + if (connectWeight.getWeightValue() > i) { + weightIpList.add(connectWeight.getCarServerIp()); + log.info("轮询结果:{}",connectWeight.getCarServerIp()); + return; + }else if (connectWeight.getWeightValue() == max ){ + weightIpList.add(connectWeight.getCarServerIp()); + log.info("轮询结果:{}",connectWeight.getCarServerIp()); + } + } + } + + // 存入redis + redisTemplate.delete("ips"); + for (String ip : weightIpList) { + redisTemplate.opsForList().rightPush("ips",ip); + } + + + } +} diff --git a/src/main/java/com/car/config/RabbitmqConfig.java b/src/main/java/com/car/config/RabbitmqConfig.java new file mode 100644 index 0000000..9c47f15 --- /dev/null +++ b/src/main/java/com/car/config/RabbitmqConfig.java @@ -0,0 +1,52 @@ +package com.car.config; + +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * rabbitMQ配置 RabbitmqConfig + * + * @author Yangle + * Date 2024/5/28 21:42 + */ +@Configuration +public class RabbitmqConfig { + public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; + public static final String QUEUE_INFORM_SMS = "disconnect_connect"; + public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; + public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; + public static final String ROUTINGKEY_SMS="inform.#.sms.#"; + + + @Bean(EXCHANGE_TOPICS_INFORM) + public Exchange EXCHANGE_TOPICS_INFORM(){ + //durable(true) 持久化,mq重启之后交换机还在 + return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); + } + + //声明QUEUE_INFORM_EMAIL队列 + @Bean(QUEUE_INFORM_EMAIL) + public Queue QUEUE_INFORM_EMAIL(){ + return new Queue(QUEUE_INFORM_EMAIL); + } + //声明QUEUE_INFORM_SMS队列 + @Bean(QUEUE_INFORM_SMS) + public Queue QUEUE_INFORM_SMS(){ + return new Queue(QUEUE_INFORM_SMS); + } + + //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey + @Bean + public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); + } + //ROUTINGKEY_SMS队列绑定交换机,指定routingKey + @Bean + public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); + } +} diff --git a/src/main/java/com/car/config/RedisConfig.java b/src/main/java/com/car/config/RedisConfig.java new file mode 100644 index 0000000..88b0979 --- /dev/null +++ b/src/main/java/com/car/config/RedisConfig.java @@ -0,0 +1,40 @@ +package com.car.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +@Configuration +public class RedisConfig { + + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory factory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(factory); + Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new + Jackson2JsonRedisSerializer(Object.class); + ObjectMapper om = new ObjectMapper(); + om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); + jackson2JsonRedisSerializer.setObjectMapper(om); + + StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); + // key采用String的序列化方式 + template.setKeySerializer(stringRedisSerializer); + // hash的key也采用String的序列化方式 + template.setHashKeySerializer(stringRedisSerializer); + // value序列化方式采用jackson + template.setValueSerializer(jackson2JsonRedisSerializer); + // hash的value序列化方式采用jackson + template.setHashValueSerializer(jackson2JsonRedisSerializer); + template.afterPropertiesSet(); + + return template; + } +} diff --git a/src/main/java/com/car/config/RestTemplateConfig.java b/src/main/java/com/car/config/RestTemplateConfig.java new file mode 100644 index 0000000..16dcefc --- /dev/null +++ b/src/main/java/com/car/config/RestTemplateConfig.java @@ -0,0 +1,20 @@ +package com.car.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.client.RestTemplate; + +@Configuration +public class RestTemplateConfig { + /** + * 没有实例化RestTemplate时,初始化RestTemplate + * @return + */ + @ConditionalOnMissingBean(RestTemplate.class) + @Bean + public RestTemplate restTemplate(){ + RestTemplate restTemplate = new RestTemplate(); + return restTemplate; + } +} diff --git a/src/main/java/com/car/consumer/ReceiveHandler.java b/src/main/java/com/car/consumer/ReceiveHandler.java new file mode 100644 index 0000000..65f2771 --- /dev/null +++ b/src/main/java/com/car/consumer/ReceiveHandler.java @@ -0,0 +1,45 @@ +package com.car.consumer; + +import com.car.config.InitConnectWeight; +import com.car.config.RabbitmqConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.DefaultApplicationArguments; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.nio.channels.Channel; + +/** + * 断开连接消费者 ReceiveHandler + * + * @author Yangle + * Date 2024/5/28 21:43 + */ +@Component +@Slf4j +public class ReceiveHandler { + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private InitConnectWeight initConnectWeight; + @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) + public void receive_email(Object msg, Message message, Channel channel){ + System.out.println("QUEUE_INFORM_EMAIL msg"+msg); + } + //监听sms队列 + @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS}) + public void receiveSms(Message message) { + try { + initConnectWeight.run(new DefaultApplicationArguments()); + redisTemplate.delete("RabbitMQ"); + log.info("ip权重重新分配成功!"); + } catch (Exception e) { + throw new RuntimeException(e); + } + log.info("消费者得到的消息: {}" , new String(message.getBody())); + } +} diff --git a/src/main/java/com/car/controller/ConnectController.java b/src/main/java/com/car/controller/ConnectController.java new file mode 100644 index 0000000..a1b00f0 --- /dev/null +++ b/src/main/java/com/car/controller/ConnectController.java @@ -0,0 +1,29 @@ +package com.car.controller; + +import com.car.demos.MqttServerModel; +import com.car.demos.Result; +import com.car.demos.req.VehicleConnectionReq; +import com.car.service.ConnectService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 车辆连接 ConnectController + * + * @author Yangle + * Date 2024/5/28 21:46 + */ +@RestController +@RequestMapping("/connect") +public class ConnectController { + + @Autowired + private ConnectService connectService; + @PostMapping("/getConnect") + public ResultgetConnect(@RequestBody VehicleConnectionReq vehicleConnectionReq){ + return connectService.getConnect(); + } +} diff --git a/src/main/java/com/car/demos/ConnectUseInfo.java b/src/main/java/com/car/demos/ConnectUseInfo.java new file mode 100644 index 0000000..e2011f2 --- /dev/null +++ b/src/main/java/com/car/demos/ConnectUseInfo.java @@ -0,0 +1,31 @@ +package com.car.demos; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 内存使用情况 ConnerctUseInfo + * + * @author Yangle + * Date 2024/5/28 21:55 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ConnectUseInfo { + /** + * 节点ID + */ + private String clusterId; + + /** + * 所属id + */ + private String ipAddress; + + /** + * 剩余可连接数量 + */ + private Integer remainingNum; +} diff --git a/src/main/java/com/car/demos/ConnectWeight.java b/src/main/java/com/car/demos/ConnectWeight.java new file mode 100644 index 0000000..9cda5d4 --- /dev/null +++ b/src/main/java/com/car/demos/ConnectWeight.java @@ -0,0 +1,27 @@ +package com.car.demos; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 连接权重 ConnectWeight + * + * @author Yangle + * Date 2024/5/28 21:56 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ConnectWeight { + /** + * 服务器ip + */ + private String carServerIp; + + /** + * 权重值 + */ + private Integer weightValue; + +} diff --git a/src/main/java/com/car/demos/HttpStatus.java b/src/main/java/com/car/demos/HttpStatus.java new file mode 100644 index 0000000..73efe32 --- /dev/null +++ b/src/main/java/com/car/demos/HttpStatus.java @@ -0,0 +1,95 @@ +package com.car.demos; + +/** + * 返回状态码 HttpStatus + * + * @author Yangle + * Date 2024/5/28 21:58 + */ + +public class HttpStatus { + /** + * 操作成功 + */ + public static final int SUCCESS = 200; + + /** + * 对象创建成功 + */ + public static final int CREATED = 201; + + /** + * 请求已经被接受 + */ + public static final int ACCEPTED = 202; + + /** + * 操作已经执行成功,但是没有返回数据 + */ + public static final int NO_CONTENT = 204; + + /** + * 资源已被移除 + */ + public static final int MOVED_PERM = 301; + + /** + * 重定向 + */ + public static final int SEE_OTHER = 303; + + /** + * 资源没有被修改 + */ + public static final int NOT_MODIFIED = 304; + + /** + * 参数列表错误(缺少,格式不匹配) + */ + public static final int BAD_REQUEST = 400; + + /** + * 未授权 + */ + public static final int UNAUTHORIZED = 401; + + /** + * 访问受限,授权过期 + */ + public static final int FORBIDDEN = 403; + + /** + * 资源,服务未找到 + */ + public static final int NOT_FOUND = 404; + + /** + * 不允许的http方法 + */ + public static final int BAD_METHOD = 405; + + /** + * 资源冲突,或者资源被锁 + */ + public static final int CONFLICT = 409; + + /** + * 不支持的数据,媒体类型 + */ + public static final int UNSUPPORTED_TYPE = 415; + + /** + * 系统内部错误 + */ + public static final int ERROR = 500; + + /** + * 接口未实现 + */ + public static final int NOT_IMPLEMENTED = 501; + + /** + * 系统警告消息 + */ + public static final int WARN = 601; +} diff --git a/src/main/java/com/car/demos/MqttServerModel.java b/src/main/java/com/car/demos/MqttServerModel.java new file mode 100644 index 0000000..2ab564b --- /dev/null +++ b/src/main/java/com/car/demos/MqttServerModel.java @@ -0,0 +1,28 @@ +package com.car.demos; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * Mqtt服务器模型 MqttServerModel + * + * @author Yangle + * Date 2024/5/28 21:58 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class MqttServerModel { + /** + * MQTT服务节点 + */ + private String broker; + + /** + * MQTT订阅主题 + */ + private String topic; +} diff --git a/src/main/java/com/car/demos/Result.java b/src/main/java/com/car/demos/Result.java new file mode 100644 index 0000000..b812ab5 --- /dev/null +++ b/src/main/java/com/car/demos/Result.java @@ -0,0 +1,102 @@ +package com.car.demos; + +import lombok.Data; + +import java.io.Serializable; + +/** + * Result + * + * @author Yangle + * Date 2024/5/28 21:59 + */ +@Data +public class Result implements Serializable { + /** + * 成功 + */ + public static final int SUCCESS = HttpStatus.SUCCESS; + /** + * 失败 + */ + public static final int FAIL = HttpStatus.ERROR; + private static final long serialVersionUID = 1L; + /** + * 系统警告消息 + */ + private static final int WARN = HttpStatus.WARN; + + private int code; + + private String msg; + + private T data; + + public static Result success () { + return restResult(null, SUCCESS, "操作成功"); + } + + public static Result success (T data) { + return restResult(data, SUCCESS, "操作成功"); + } + + public static Result success (T data, String msg) { + return restResult(data, SUCCESS, msg); + } + + public static Result error () { + return restResult(null, FAIL, "操作失败"); + } + + public static Result error (String msg) { + return restResult(null, FAIL, msg); + } + + public static Result error (T data) { + return restResult(data, FAIL, "操作失败"); + } + + public static Result error (T data, String msg) { + return restResult(data, FAIL, msg); + } + + public static Result error (int code, String msg) { + return restResult(null, code, msg); + } + + public static Result warn () { + return restResult(null, WARN, "操作失败"); + } + + public static Result warn (String msg) { + return restResult(null, WARN, msg); + } + + public static Result warn (T data) { + return restResult(data, WARN, "操作失败"); + } + + public static Result warn (T data, String msg) { + return restResult(data, WARN, msg); + } + + public static Result warn (int code, String msg) { + return restResult(null, code, msg); + } + + private static Result restResult (T data, int code, String msg) { + Result apiResult = new Result<>(); + apiResult.setCode(code); + apiResult.setData(data); + apiResult.setMsg(msg); + return apiResult; + } + + public static Boolean isError (Result ret) { + return !isSuccess(ret); + } + + public static Boolean isSuccess (Result ret) { + return Result.SUCCESS == ret.getCode(); + } +} diff --git a/src/main/java/com/car/demos/req/VehicleConnectionReq.java b/src/main/java/com/car/demos/req/VehicleConnectionReq.java new file mode 100644 index 0000000..feefc99 --- /dev/null +++ b/src/main/java/com/car/demos/req/VehicleConnectionReq.java @@ -0,0 +1,51 @@ +package com.car.demos.req; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author DongZl + * @description: 车辆获取连接地址 + * @Date 2023-11-28 上午 10:32 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class VehicleConnectionReq { + + /** + * { + * "vehicleVin": "VIN1234567894", + * "timestamp": "11111", + * "username": "你好", + * "nonce": "33" + * } + */ + + /** + * vin + */ + @JSONField(name = "vehicleVin") + private String vin; + + /** + * 时间戳 + */ + private String timestamp; + + /** + * 用户名 + */ + @JSONField(name = "username") + private String userName; + + /** + * 随机字符串 + */ + private String nonce; + +} diff --git a/src/main/java/com/car/demos/web/BasicController.java b/src/main/java/com/car/demos/web/BasicController.java new file mode 100644 index 0000000..6d1f9cb --- /dev/null +++ b/src/main/java/com/car/demos/web/BasicController.java @@ -0,0 +1,67 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.car.demos.web; + +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.ModelAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +/** + * @author theonefx + */ +@Controller +public class BasicController { + + // http://127.0.0.1:8080/hello?name=lisi + @RequestMapping("/hello") + @ResponseBody + public String hello(@RequestParam(name = "name", defaultValue = "unknown user") String name) { + return "Hello " + name; + } + + // http://127.0.0.1:8080/user + @RequestMapping("/user") + @ResponseBody + public User user() { + User user = new User(); + user.setName("theonefx"); + user.setAge(666); + return user; + } + + // http://127.0.0.1:8080/save_user?name=newName&age=11 + @RequestMapping("/save_user") + @ResponseBody + public String saveUser(User u) { + return "user will save: name=" + u.getName() + ", age=" + u.getAge(); + } + + // http://127.0.0.1:8080/html + @RequestMapping("/html") + public String html(){ + return "index.html"; + } + + @ModelAttribute + public void parseUser(@RequestParam(name = "name", defaultValue = "unknown user") String name + , @RequestParam(name = "age", defaultValue = "12") Integer age, User user) { + user.setName("zhangsan"); + user.setAge(18); + } +} diff --git a/src/main/java/com/car/demos/web/PathVariableController.java b/src/main/java/com/car/demos/web/PathVariableController.java new file mode 100644 index 0000000..ce61138 --- /dev/null +++ b/src/main/java/com/car/demos/web/PathVariableController.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.car.demos.web; + +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; + +/** + * @author theonefx + */ +@Controller +public class PathVariableController { + + // http://127.0.0.1:8080/user/123/roles/222 + @RequestMapping(value = "/user/{userId}/roles/{roleId}", method = RequestMethod.GET) + @ResponseBody + public String getLogin(@PathVariable("userId") String userId, @PathVariable("roleId") String roleId) { + return "User Id : " + userId + " Role Id : " + roleId; + } + + // http://127.0.0.1:8080/javabeat/somewords + @RequestMapping(value = "/javabeat/{regexp1:[a-z-]+}", method = RequestMethod.GET) + @ResponseBody + public String getRegExp(@PathVariable("regexp1") String regexp1) { + return "URI Part : " + regexp1; + } +} diff --git a/src/main/java/com/car/demos/web/User.java b/src/main/java/com/car/demos/web/User.java new file mode 100644 index 0000000..5015d9b --- /dev/null +++ b/src/main/java/com/car/demos/web/User.java @@ -0,0 +1,43 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.car.demos.web; + +/** + * @author theonefx + */ +public class User { + + private String name; + + private Integer age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } +} diff --git a/src/main/java/com/car/mqtt/MessageCallbackService.java b/src/main/java/com/car/mqtt/MessageCallbackService.java new file mode 100644 index 0000000..ec9ce52 --- /dev/null +++ b/src/main/java/com/car/mqtt/MessageCallbackService.java @@ -0,0 +1,35 @@ +package com.car.mqtt; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +/** + * 回执消息类 MessageCallbackService + * + * @author Yangle + * Date 2024/5/29 20:24 + */ +@Service +public class MessageCallbackService implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + System.out.println("connectionLost: " + cause.getMessage()); + } + @Override + public void messageArrived(String topic, MqttMessage message) { + System.out.println("topic: " + topic); + System.out.println("Qos: " + message.getQos()); + System.out.println("message content: " + new String(message.getPayload())); + + } + + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + +} diff --git a/src/main/java/com/car/mqtt/MqttFactory.java b/src/main/java/com/car/mqtt/MqttFactory.java new file mode 100644 index 0000000..ed41ec0 --- /dev/null +++ b/src/main/java/com/car/mqtt/MqttFactory.java @@ -0,0 +1,47 @@ +package com.car.mqtt; + +import lombok.AllArgsConstructor; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +/** + * mqtt配置 MqttConfig + * + * @author Yangle + * Date 2024/5/29 20:26 + */ +@Service +@AllArgsConstructor +public class MqttFactory { + + private final MessageCallbackService messageCallbackService; + public MqttClient creatClient(MqttProperties mqttProperties) { + MqttClient client = null; + try { + client = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientid(), new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + + // 连接参数 + if (mqttProperties.isLong()) + { + options.setUserName(mqttProperties.getUsername()); + options.setPassword(mqttProperties.getPassword().toCharArray()); + } + + + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + client.connect(options); + client.setCallback(messageCallbackService); + client.subscribe(mqttProperties.getTopic(),0); + + } catch (MqttException e) { + throw new RuntimeException(e); + } + + return client; + } +} diff --git a/src/main/java/com/car/mqtt/MqttProperties.java b/src/main/java/com/car/mqtt/MqttProperties.java new file mode 100644 index 0000000..9e8cc35 --- /dev/null +++ b/src/main/java/com/car/mqtt/MqttProperties.java @@ -0,0 +1,40 @@ +package com.car.mqtt; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.apache.commons.lang3.StringUtils; + +/** + * 配置文件 MqttProperties + * + * @author Yangle + * Date 2024/5/29 20:06 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder +public class MqttProperties { + + private String broker; + private String topic ; + private String username; + private String password; + private String clientid; + + public static MqttProperties configBuild(String ip,String topic){ + return MqttProperties.builder() + .broker("tcp://"+ip+":1883") + .topic(topic) + .clientid("protocol-parsing") + .build(); + } + + public boolean isLong(){ + return !StringUtils.isBlank(this.username) && !StringUtils.isBlank(this.password); + } + + +} diff --git a/src/main/java/com/car/mqtt/MsgHandler.java b/src/main/java/com/car/mqtt/MsgHandler.java new file mode 100644 index 0000000..10ec7fc --- /dev/null +++ b/src/main/java/com/car/mqtt/MsgHandler.java @@ -0,0 +1,33 @@ +package com.car.mqtt; + +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 消息处理器 MsgHandler + * + * @author Yangle + * Date 2024/5/29 20:44 + */ +@Log4j2 +@Component +public class MsgHandler { + + @Autowired + private MqttFactory mqttFactory; + @RabbitListener(queues = "create.topic") + public void msg(String msg){ + System.out.println("接收到消息:" + msg); + MqttProperties mqttProperties = MqttProperties.configBuild( + "47.102.133.88", + "mqtt/test" + ); + log.error("接收到消息初始化信息:{}",mqttProperties); + MqttClient mqttClient = mqttFactory.creatClient(mqttProperties); + log.error("client创建成功:{}",mqttClient.getClientId()); + + } +} diff --git a/src/main/java/com/car/mqtt/RabbitConfig.java b/src/main/java/com/car/mqtt/RabbitConfig.java new file mode 100644 index 0000000..780ddb2 --- /dev/null +++ b/src/main/java/com/car/mqtt/RabbitConfig.java @@ -0,0 +1,37 @@ +package com.car.mqtt; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * RabbitConfig + * + * @author Yangle + * Date 2024/5/29 20:44 + */ +@Configuration +public class RabbitConfig { + + + @Bean + public Queue autoDeleteQueue1() { + return new Queue("create.topic", true); + } + + @Bean + public DirectExchange directExchange() { + return new DirectExchange("topic.direct"); + } + + @Bean + public Binding binding(DirectExchange directExchange, + Queue autoDeleteQueue1 ) { + return BindingBuilder.bind(autoDeleteQueue1) + .to(directExchange) + .with("protocol-parsing"); + } +} \ No newline at end of file diff --git a/src/main/java/com/car/service/ConnectService.java b/src/main/java/com/car/service/ConnectService.java new file mode 100644 index 0000000..0fdf18c --- /dev/null +++ b/src/main/java/com/car/service/ConnectService.java @@ -0,0 +1,15 @@ +package com.car.service; + +import com.car.demos.MqttServerModel; +import com.car.demos.Result; + +/** + * 车辆连接业务层 ConnectImpl + * + * @author Yangle + * Date 2024/5/28 21:49 + */ +public interface ConnectService { + Result getConnect(); + +} diff --git a/src/main/java/com/car/service/impl/ConnectServiceImpl.java b/src/main/java/com/car/service/impl/ConnectServiceImpl.java new file mode 100644 index 0000000..31eabab --- /dev/null +++ b/src/main/java/com/car/service/impl/ConnectServiceImpl.java @@ -0,0 +1,57 @@ +package com.car.service.impl; + +import com.aliyun.ecs20140526.Client; +import com.aliyun.teaopenapi.models.Config; +import com.car.demos.MqttServerModel; +import com.car.demos.Result; + +import com.car.service.ConnectService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +/** + * 业务实现层 ConnectImplImpl + * + * @author Yangle + * Date 2024/5/28 21:50 + */ +@Service +public class ConnectServiceImpl implements ConnectService { +// @Autowired +// private ConnerMapper connerMapper; + @Autowired + private StringRedisTemplate redisTemplate; + @Override + public Result getConnect() { + if (redisTemplate.hasKey("Redis")){ + + Integer count = Integer.valueOf(redisTemplate.opsForValue().get("Redis")); + if (count == 100){ + redisTemplate.opsForValue().set("Redis",String.valueOf(0)); + }else { + redisTemplate.opsForValue().set("Redis",String.valueOf(count + 1)); + } + + String ip = redisTemplate.opsForList().index("ips", count); + return Result.success(new MqttServerModel("tcp://"+ip+":1883","test1")); + }else { + redisTemplate.opsForValue().set("count",String.valueOf(1)); + String ip = redisTemplate.opsForList().index("ips", 0); + return Result.success(new MqttServerModel("tcp://"+ip+":1883","test1")); + } + } + + public static Client createClient() throws Exception { + // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 + // 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378657.html。 + Config config = new Config() + // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。 + .setAccessKeyId("LTAI5tFVx9F12e5f4EuJzyZj") + // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 + .setAccessKeySecret("mn06SdxTmdmCjmaEGBq95bVF6e3Sa9"); + // Endpoint 请参考 https://api.aliyun.com/product/Ecs + config.endpoint = "ecs.cn-shanghai.aliyuncs.com"; + return new Client(config); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..65ce8e3 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,60 @@ +# 应用服务 WEB 访问端口 +server: + port: 8082 + +spring: + rabbitmq: + host: 115.159.211.196 + port: 5672 + username: guest + password: guest + virtualHost: / + mvc: + pathmatch: + matching-strategy: ant_path_matcher + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://115.159.211.196:3306/data_basete?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false + username: root + password: yl@123 + druid: + # 下面为连接池的补充设置,应用到上面所有数据源中 + # 初始化大小,最小,最大 + initial-size: 5 + min-idle: 5 + max-active: 20 + # 配置获取连接等待超时的时间 + max-wait: 60000 + # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + time-between-eviction-runs-millis: 60000 + # 配置一个连接在池中最小生存的时间,单位是毫秒 + min-evictable-idle-time-millis: 300000 + validation-query: SELECT 1 FROM DUAL + test-while-idle: true + test-on-borrow: false + test-on-return: false + # 打开PSCache,并且指定每个连接上PSCache的大小 + pool-prepared-statements: true + # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 + max-pool-prepared-statement-per-connection-size: 20 + filters: stat,wall + use-global-data-source-stat: true + # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 + connect-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 + application: + name: shop-server + redis: + host: 115.159.211.196 + port: 6379 + password: yl030509 + +## mybatis +#mybatis: +# configuration: +# map-underscore-to-camel-case: true +# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl +# mapper-locations: classpath*:mapper/*Mapper.xml +# global-config: +# db-config: +# id-type: auto + diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html new file mode 100644 index 0000000..e2d94a2 --- /dev/null +++ b/src/main/resources/static/index.html @@ -0,0 +1,6 @@ + + +

hello word!!!

+

this is a html page

+ + \ No newline at end of file diff --git a/src/test/java/com/car/ZnCarApplicationTests.java b/src/test/java/com/car/ZnCarApplicationTests.java new file mode 100644 index 0000000..32262ab --- /dev/null +++ b/src/test/java/com/car/ZnCarApplicationTests.java @@ -0,0 +1,13 @@ +//package com.car; +// +//import org.junit.jupiter.api.Test; +//import org.springframework.boot.test.context.SpringBootTest; +// +//@SpringBootTest +//class ZnCarApplicationTests { +// +// @Test +// void contextLoads() { +// } +// +//}