diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitConstants.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitConstants.java index 70c7620..6e390d4 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitConstants.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitConstants.java @@ -2,6 +2,7 @@ package com.muyu.common.rabbit.constants; /** * rabbit常量 + * * @Author: 胡杨 * @date: 2024/7/10 * @Description: rabbit常量 @@ -9,7 +10,8 @@ package com.muyu.common.rabbit.constants; */ public class RabbitConstants { - public final static String GO_ONLINE_QUEUE= "GoOnline"; + public final static String GO_ONLINE_QUEUE = "GoOnline"; - public final static String DOWNLINE_QUEUE= "Downline"; + public final static String DOWNLINE_QUEUE = "Downline"; + public final static String FORM_QUEUE = "queue_inform_sms"; } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/Aliyun/AliYunConfig.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/AliYunConfig.java similarity index 93% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/Aliyun/AliYunConfig.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/AliYunConfig.java index 489a044..6848146 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/Aliyun/AliYunConfig.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/AliYunConfig.java @@ -1,8 +1,8 @@ -package com.muyu.cargateway.Aliyun; +package com.muyu.car.gateway.Aliyun; import com.aliyun.ecs20140526.Client; import com.aliyun.teaopenapi.models.Config; -import com.muyu.cargateway.config.AliProperties; +import com.muyu.car.gateway.config.AliProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/DeleteSample.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/instance/DeleteSample.java similarity index 86% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/DeleteSample.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/instance/DeleteSample.java index d7cd031..2f00c00 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/DeleteSample.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/instance/DeleteSample.java @@ -1,26 +1,27 @@ -package com.muyu.cargateway.instance; +package com.muyu.car.gateway.Aliyun.instance; -import com.muyu.cargateway.Aliyun.service.AliYunEcsService; +import com.muyu.car.gateway.Aliyun.service.AliYunEcsService; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-10-07-21:51 * @ Version:1.0 * @ Description:删除实例方法 - * @author Lenovo */ @Log4j2 @Component public class DeleteSample implements DisposableBean { @Autowired private AliYunEcsService aliYunEcsService; + @Override - public void destroy() { + public void destroy() { try { log.info("===开始执行删除实例方法"); Thread.sleep(10000); diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/Sample.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/instance/Sample.java similarity index 70% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/Sample.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/instance/Sample.java index ecf2176..fcb2222 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/instance/Sample.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/instance/Sample.java @@ -1,9 +1,8 @@ -package com.muyu.cargateway.instance; +package com.muyu.car.gateway.Aliyun.instance; -import com.muyu.cargateway.Aliyun.service.AliYunEcsService; -import com.muyu.cargateway.config.AliProperties; -import com.muyu.cargateway.domain.AliInstance; -import com.muyu.common.redis.service.RedisService; +import com.muyu.car.gateway.Aliyun.service.AliYunEcsService; +import com.muyu.car.gateway.config.AliProperties; +import com.muyu.car.gateway.domain.AliInstance; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; @@ -22,14 +21,12 @@ import java.util.List; */ @Log4j2 @Component -public class Sample implements ApplicationRunner{ +public class Sample implements ApplicationRunner { @Autowired private AliYunEcsService aliYunEcsService; @Autowired private AliProperties aliProperties; - @Autowired - private RedisService redisService; @Override public void run(ApplicationArguments args) throws Exception { @@ -42,17 +39,14 @@ public class Sample implements ApplicationRunner{ throw new RuntimeException(e); } log.info("创建实例成功"); - redisService.setCacheList("instanceIds", list); +// redisService.setCacheList("instanceIds", list); try { - Thread.sleep(6000); + Thread.sleep(9000); } catch (InterruptedException e) { throw new RuntimeException(e); } List aliInstances = aliYunEcsService.selectInstance(list); - log.info("查询实例信息成功:{}",aliInstances); - // 将查询到的实例信息列表存储到Redis中 - redisService.setCacheList("instanceList", aliInstances); - log.info("redis存储成功:{}", aliInstances); + log.info("================查询实例信息成功:{}", aliInstances); } // @Override diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/Aliyun/service/AliYunEcsService.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/service/AliYunEcsService.java similarity index 90% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/Aliyun/service/AliYunEcsService.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/service/AliYunEcsService.java index 753c3e3..8d704a4 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/Aliyun/service/AliYunEcsService.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/Aliyun/service/AliYunEcsService.java @@ -1,11 +1,11 @@ -package com.muyu.cargateway.Aliyun.service; +package com.muyu.car.gateway.Aliyun.service; import com.aliyun.ecs20140526.Client; import com.aliyun.ecs20140526.models.*; import com.aliyun.tea.TeaException; import com.aliyun.teautil.models.RuntimeOptions; -import com.muyu.cargateway.config.AliProperties; -import com.muyu.cargateway.domain.AliInstance; +import com.muyu.car.gateway.domain.AliInstance; +import com.muyu.car.gateway.config.AliProperties; import com.muyu.common.core.exception.ServiceException; import com.muyu.common.redis.service.RedisService; import lombok.extern.log4j.Log4j2; @@ -49,8 +49,9 @@ public class AliYunEcsService { * @return 实例id集合 */ public List generateInstance(Integer amount) { - redisService.deleteObject("instanceIds"); - redisService.deleteObject("instanceList"); + redisService.deleteObject("oneIpList"); + redisService.deleteObject("oneCount"); + redisService.deleteObject("oneVinIp"); // 检查生成实例的数量是否有效 if (amount == null || amount <= 0) { throw new ServiceException("生成数量不能小于1"); @@ -125,6 +126,7 @@ public class AliYunEcsService { // 创建运行时选项对象,用于配置请求的额外参数 RuntimeOptions runtimeOptions = new RuntimeOptions(); List aliInstances = new ArrayList<>(); + List stringArrayList = new ArrayList<>(); try { // 发送请求并获取响应对象 DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(request, runtimeOptions); @@ -136,16 +138,25 @@ public class AliYunEcsService { for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance bodyInstance : instance) { // 实例id String instanceId = bodyInstance.getInstanceId(); - log.info("实例id为:{}", instanceId); // ip地址 String ipAddress = bodyInstance.getPublicIpAddress().getIpAddress().get(0); - log.info("实例ip为:{}", ipAddress); // 实例状态 String status = bodyInstance.getStatus(); - log.info("实例状态为:{}", status); + + log.info("=======================实例id为:{}", instanceId); + log.info("=======================实例ip为:{}", ipAddress); + log.info("=======================实例状态为:{}", status); + + + stringArrayList.add(ipAddress); AliInstance aliInstance = new AliInstance(instanceId, ipAddress, status); aliInstances.add(aliInstance); + redisService.setCacheList(instanceId, aliInstances); + aliInstances.remove(aliInstance); + } + log.info("======================ipList:{}", stringArrayList); + redisService.setCacheList("oneIpList", stringArrayList); log.info("查询成功"); } catch (Exception e) { log.error("查询服务器实例错误:[{}]", e.getMessage(), e); diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/CloudVehicleGatewayApplication.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/CarGatewayApplication.java similarity index 78% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/CloudVehicleGatewayApplication.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/CarGatewayApplication.java index 840f0ce..a7a7431 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/CloudVehicleGatewayApplication.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/CarGatewayApplication.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway; +package com.muyu.car.gateway; import com.muyu.common.security.annotation.EnableCustomConfig; import lombok.extern.log4j.Log4j2; @@ -7,19 +7,19 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-17-15:00 * @ Version:1.0 * @ Description:故障启动类 - * @author Lenovo */ @Log4j2 @EnableCustomConfig @EnableFeignClients @SpringBootApplication -public class CloudVehicleGatewayApplication { +public class CarGatewayApplication { public static void main(String[] args) { - SpringApplication.run(CloudVehicleGatewayApplication.class, args); + SpringApplication.run(CarGatewayApplication.class, args); } } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/config/AliProperties.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/config/AliProperties.java similarity index 96% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/config/AliProperties.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/config/AliProperties.java index c0ff543..b1148a9 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/config/AliProperties.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/config/AliProperties.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway.config; +package com.muyu.car.gateway.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/config/RabbitmqConfig.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/config/RabbitmqConfig.java new file mode 100644 index 0000000..b05e847 --- /dev/null +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/config/RabbitmqConfig.java @@ -0,0 +1,189 @@ +package com.muyu.car.gateway.config; + +import lombok.extern.log4j.Log4j2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Lenovo + * @ Tool:IntelliJ IDEA + * @ Author:CHX + * @ Date:2024-10-04-15:13 + * @ Version:1.0 + * @ Description:rabbitmq配置类 + */ +@Log4j2 +@Configuration +public class RabbitmqConfig { + /** + * 日志 + */ + private static final Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class); + + /** + * 交换机 + */ + public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; + + /** + * 队列 车辆上线给事件系统发送vin + */ + public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; + /** + * 队列 协议解析 + */ + public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; + /** + * 队列 车辆下线给事件系统发送vin + */ + public static final String QUEUE_INFORM_SEND = "queue_inform_send"; + /** + * 队列 saas系统 + */ + public static final String QUEUE_INFORM_SAAS = "queue_inform_saas"; + /** + * 路由key 车辆上线给事件系统 + */ + public static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; + /** + * 路由key 协议解析 + */ + public static final String ROUTINGKEY_SMS = "inform.#.sms.#"; + /** + * 路由key 车辆下线给事件系统 + */ + public static final String ROUTINGKEY_SEND = "inform.#.send.#"; + /** + * 路由key saas系统 + */ + public static final String ROUTINGKEY_SAAS = "inform.#.saas.#"; + + + /** + * 声明交换机,做持久化 + */ + @Bean(EXCHANGE_TOPICS_INFORM) + public Exchange exchangeTopicsInform() { + try { + Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); + log.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM); + return exchange; + } catch (Exception e) { + log.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e); + throw e; + } + } + + /** + * 声明QUEUE_INFORM_EMAIL 队列 + */ + @Bean(QUEUE_INFORM_EMAIL) + public Queue queueInformEmail() { + try { + Queue queue = new Queue(QUEUE_INFORM_EMAIL); + log.info("创建的队列为: {}", QUEUE_INFORM_EMAIL); + return queue; + } catch (Exception e) { + log.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e); + throw e; + } + } + + /** + * 声明QUEUE_INFORM_SMS 队列 + */ + @Bean(QUEUE_INFORM_SMS) + public Queue queueInformSms() { + try { + Queue queue = new Queue(QUEUE_INFORM_SMS); + log.info("创建的队列为: {}", QUEUE_INFORM_SMS); + return queue; + } catch (Exception e) { + log.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e); + throw e; + } + } + + /** + * QUEUE_INFORM_SEND 队列 + */ + @Bean(QUEUE_INFORM_SEND) + public Queue queueInformSend() { + try { + Queue queue = new Queue(QUEUE_INFORM_SEND); + log.info("创建的队列为: {}", QUEUE_INFORM_SEND); + return queue; + } catch (Exception e) { + log.error("创建该: {} 队列失败", QUEUE_INFORM_SEND, e); + throw e; + } + } + + /** + * QUEUE_INFORM_SAAS 队列 + */ + @Bean(QUEUE_INFORM_SAAS) + public Queue queueInformSaas() { + try { + Queue queue = new Queue(QUEUE_INFORM_SAAS); + log.info("创建的队列为: {}", QUEUE_INFORM_SAAS); + return queue; + } catch (Exception e) { + log.error("创建该: {} 队列失败", QUEUE_INFORM_SAAS, e); + throw e; + } + } + + + /** + * QUEUE_INFORM_EMAIL队列绑定交换机,指定routingKey ROUTINGKEY_EMAIL + * + * @param queue QUEUE_INFORM_EMAIL + * @param exchange EXCHANGE_TOPICS_INFORM + */ + @Bean + public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); + } + + /** + * QUEUE_INFORM_SMS 队列绑定交换机,指定routingKey ROUTINGKEY_SMS + * + * @param queue QUEUE_INFORM_SMS + * @param exchange EXCHANGE_TOPICS_INFORM + */ + @Bean + public Binding bindingRoutingKeySms(@Qualifier(QUEUE_INFORM_SMS) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); + } + + /** + * QUEUE_INFORM_SEND队列绑定交换机,指定routingKey ROUTINGKEY_SEND + * + * @param queue QUEUE_INFORM_SEND + * @param exchange EXCHANGE_TOPICS_INFORM + */ + @Bean + public Binding bindingRoutingKeySend(@Qualifier(QUEUE_INFORM_SEND) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SEND).noargs(); + } + + /** + * QUEUE_INFORM_SAAS队列绑定交换机,指定routingKey ROUTINGKEY_SAAS + * + * @param queue QUEUE_INFORM_SAAS + * @param exchange EXCHANGE_TOPICS_INFORM + */ + @Bean + public Binding bindingRoutingKeySaas(@Qualifier(QUEUE_INFORM_SAAS) Queue queue, + @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SAAS).noargs(); + } +} diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/controller/CarOneClickOperationController.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/controller/CarOneClickOperationController.java similarity index 68% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/controller/CarOneClickOperationController.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/controller/CarOneClickOperationController.java index 8173dc7..b954970 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/controller/CarOneClickOperationController.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/controller/CarOneClickOperationController.java @@ -1,8 +1,8 @@ -package com.muyu.cargateway.controller; +package com.muyu.car.gateway.controller; -import com.muyu.cargateway.domain.model.MqttServerModel; -import com.muyu.cargateway.domain.req.VehicleConnectionReq; -import com.muyu.cargateway.service.CarOneClickOperationService; +import com.muyu.car.gateway.domain.req.VehicleConnectionReq; +import com.muyu.car.gateway.service.CarOneClickOperationService; +import com.muyu.car.gateway.domain.model.MqttServerModel; import com.muyu.common.core.domain.Result; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.log4j.Log4j2; @@ -13,30 +13,31 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-26-20:14 * @ Version:1.0 * @ Description:车辆 - * @author Lenovo */ @Log4j2 @RestController @RequestMapping("/vehicleGateway") @Tag(name = "连接车辆控制层") public class CarOneClickOperationController { - @Autowired + @Autowired private CarOneClickOperationService carOneClickOperationService; /** * 获取http连接的参数 + * * @param vehicleConnectionReq * @return */ @PostMapping("/receiveMsg/connect") - public Result receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq){ - log.info(">"+vehicleConnectionReq); - MqttServerModel mqttServerModel =carOneClickOperationService.getConnect(vehicleConnectionReq); - return Result.success(mqttServerModel); + public Result receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq) { + log.info("======================" + vehicleConnectionReq); + return carOneClickOperationService.getConnect(vehicleConnectionReq); + } } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/AliInstance.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/AliInstance.java similarity index 90% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/AliInstance.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/AliInstance.java index eb1b4ae..e63f28f 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/AliInstance.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/AliInstance.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway.domain; +package com.muyu.car.gateway.domain; import lombok.AllArgsConstructor; import lombok.Builder; @@ -6,12 +6,12 @@ import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-30-9:33 * @ Version:1.0 * @ Description: - * @author Lenovo */ @Data @Builder @@ -19,7 +19,7 @@ import lombok.NoArgsConstructor; @NoArgsConstructor public class AliInstance { /** - *实例ID + * 实例ID */ private String instanceId; /** diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/AliServerConfig.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/AliServerConfig.java similarity index 95% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/AliServerConfig.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/AliServerConfig.java index b6d545a..2ab8ae6 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/AliServerConfig.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/AliServerConfig.java @@ -1,16 +1,16 @@ -package com.muyu.cargateway.domain; +package com.muyu.car.gateway.domain; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-28-16:37 * @ Version:1.0 * @ Description:创建实例的配置 - * @author Lenovo */ @Data @AllArgsConstructor diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/ConnectWeight.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/ConnectWeight.java similarity index 92% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/ConnectWeight.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/ConnectWeight.java index efd0076..0268317 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/ConnectWeight.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/ConnectWeight.java @@ -1,16 +1,16 @@ -package com.muyu.cargateway.domain; +package com.muyu.car.gateway.domain; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-10-04-15:16 * @ Version:1.0 * @ Description:车辆服务器 - * @author Lenovo */ @Data @AllArgsConstructor diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/ServerConfig.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/ServerConfig.java similarity index 89% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/ServerConfig.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/ServerConfig.java index 6628de0..0825df7 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/ServerConfig.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/ServerConfig.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway.domain; +package com.muyu.car.gateway.domain; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; @@ -9,23 +9,23 @@ import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-27-20:56 * @ Version:1.0 * @ Description:服务器配置 - * @author Lenovo */ @Data @Builder @NoArgsConstructor @AllArgsConstructor -@TableName(value="server_config") +@TableName(value = "server_config") public class ServerConfig { /** * 主键 */ - @TableId(value = "id",type = IdType.AUTO) + @TableId(value = "id", type = IdType.AUTO) private Long id; /** * 租户id diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/VehicleConnection.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/VehicleConnection.java similarity index 94% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/VehicleConnection.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/VehicleConnection.java index 819d48e..7a88961 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/VehicleConnection.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/VehicleConnection.java @@ -1,16 +1,16 @@ -package com.muyu.cargateway.domain; +package com.muyu.car.gateway.domain; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-10-06-11:05 * @ Version:1.0 * @ Description:车辆鉴权的参数 - * @author Lenovo */ @Data @AllArgsConstructor diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/VinIp.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/VinIp.java similarity index 82% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/VinIp.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/VinIp.java index b6787f9..d8b9751 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/VinIp.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/VinIp.java @@ -1,16 +1,16 @@ -package com.muyu.cargateway.domain; +package com.muyu.car.gateway.domain; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-10-03-10:10 * @ Version:1.0 * @ Description:车辆vin - * @author Lenovo */ @Data @AllArgsConstructor @@ -19,9 +19,9 @@ public class VinIp { /** * 车辆的vin */ - String vehicleVin; + String vin; /** * 车辆的ip */ - String ipAddress; + String ip; } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/model/MqttServerModel.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/model/MqttServerModel.java similarity index 91% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/model/MqttServerModel.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/model/MqttServerModel.java index 2f29353..39a74d5 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/model/MqttServerModel.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/model/MqttServerModel.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway.domain.model; +package com.muyu.car.gateway.domain.model; import lombok.AllArgsConstructor; import lombok.Builder; @@ -6,12 +6,12 @@ import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-10-03-10:12 * @ Version:1.0 * @ Description:Mqtt服务模型 - * @author Lenovo */ @Data @Builder @@ -26,4 +26,6 @@ public class MqttServerModel { * MQTT订阅主题 */ private String topic; + + } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/properties/MqttProperties.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/properties/MqttProperties.java similarity index 93% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/properties/MqttProperties.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/properties/MqttProperties.java index f848275..7af69a7 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/properties/MqttProperties.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/properties/MqttProperties.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway.domain.properties; +package com.muyu.car.gateway.domain.properties; import lombok.AllArgsConstructor; import lombok.Builder; diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/req/VehicleConnectionReq.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/req/VehicleConnectionReq.java similarity index 93% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/req/VehicleConnectionReq.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/req/VehicleConnectionReq.java index 3101fac..31b2bf7 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/req/VehicleConnectionReq.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/req/VehicleConnectionReq.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway.domain.req; +package com.muyu.car.gateway.domain.req; import lombok.AllArgsConstructor; import lombok.Builder; @@ -6,12 +6,12 @@ import lombok.Data; import lombok.NoArgsConstructor; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-10-03-10:04 * @ Version:1.0 * @ Description:车辆获取连接地址 - * @author Lenovo */ @Data @Builder diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/resp/AliServerConfig.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/resp/AliServerConfig.java similarity index 87% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/resp/AliServerConfig.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/resp/AliServerConfig.java index ded0cc8..8c856af 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/resp/AliServerConfig.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/domain/resp/AliServerConfig.java @@ -1,4 +1,4 @@ -package com.muyu.cargateway.domain.resp; +package com.muyu.car.gateway.domain.resp; import lombok.AllArgsConstructor; import lombok.Data; @@ -6,12 +6,12 @@ import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-26-15:53 * @ Version:1.0 * @ Description:调用Ali服务器配置实体类 - * @author Lenovo */ @Data @SuperBuilder @@ -21,7 +21,7 @@ public class AliServerConfig { /** * 主键 */ - private Long id; + private Long id; /** * 地域id (实例所属的地域ID) */ @@ -29,7 +29,7 @@ public class AliServerConfig { /** * 镜像id */ - private String imageId; + private String imageId; /** * 实例规格 (实例的资源规格) */ diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/mapper/CarOneClickOperationMapper.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/mapper/CarOneClickOperationMapper.java similarity index 71% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/mapper/CarOneClickOperationMapper.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/mapper/CarOneClickOperationMapper.java index 3fc31fa..b005964 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/mapper/CarOneClickOperationMapper.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/mapper/CarOneClickOperationMapper.java @@ -1,21 +1,26 @@ -package com.muyu.cargateway.mapper; +package com.muyu.car.gateway.mapper; -import com.muyu.cargateway.domain.VehicleConnection; +import com.muyu.car.gateway.domain.VehicleConnection; import org.apache.ibatis.annotations.Mapper; import java.util.List; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-26-20:15 * @ Version:1.0 * @ Description:车辆一键操作持久层 - * @author Lenovo */ @Mapper public interface CarOneClickOperationMapper { void addConnect(VehicleConnection vehicleConnection); List selectByVehicleVin(String vehicleVin); + + + List getMqttServerModel(String vehicleVin); + + } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/CarOneClickOperationService.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/CarOneClickOperationService.java similarity index 53% rename from cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/CarOneClickOperationService.java rename to cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/CarOneClickOperationService.java index 71c639f..140b8fa 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/CarOneClickOperationService.java +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/CarOneClickOperationService.java @@ -1,22 +1,24 @@ -package com.muyu.cargateway.service; +package com.muyu.car.gateway.service; -import com.muyu.cargateway.domain.model.MqttServerModel; -import com.muyu.cargateway.domain.req.VehicleConnectionReq; +import com.muyu.car.gateway.domain.model.MqttServerModel; +import com.muyu.car.gateway.domain.req.VehicleConnectionReq; +import com.muyu.common.core.domain.Result; /** + * @author Lenovo * @ Tool:IntelliJ IDEA * @ Author:CHX * @ Date:2024-09-26-20:15 * @ Version:1.0 * @ Description:车辆一键操作业务层 - * @author Lenovo */ public interface CarOneClickOperationService { /** * 获取连接 + * * @param vehicleConnectionReq 车辆连接请求参数 * @return */ - MqttServerModel getConnect(VehicleConnectionReq vehicleConnectionReq); + Result getConnect(VehicleConnectionReq vehicleConnectionReq); } diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/Impl/CarOneClickOperationServiceImpl.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/Impl/CarOneClickOperationServiceImpl.java new file mode 100644 index 0000000..8c279e5 --- /dev/null +++ b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/car/gateway/service/Impl/CarOneClickOperationServiceImpl.java @@ -0,0 +1,144 @@ +package com.muyu.car.gateway.service.Impl; + +import com.muyu.car.gateway.domain.VehicleConnection; +import com.muyu.car.gateway.domain.VinIp; +import com.muyu.car.gateway.domain.model.MqttServerModel; +import com.muyu.car.gateway.domain.properties.MqttProperties; +import com.muyu.car.gateway.domain.req.VehicleConnectionReq; +import com.muyu.car.gateway.mapper.CarOneClickOperationMapper; +import com.muyu.car.gateway.service.CarOneClickOperationService; +import com.muyu.common.core.domain.Result; +import com.muyu.common.redis.service.RedisService; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.util.List; + +import static com.muyu.car.gateway.config.RabbitmqConfig.EXCHANGE_TOPICS_INFORM; +import static com.muyu.car.gateway.config.RabbitmqConfig.ROUTINGKEY_SMS; + +/** + * @author Lenovo + * @ Tool:IntelliJ IDEA + * @ Author:CHX + * @ Date:2024-09-26-20:16 + * @ Version:1.0 + * @ Description:车辆一键操作业务实现层 + */ +@Log4j2 +@Service +public class CarOneClickOperationServiceImpl implements CarOneClickOperationService { + + @Autowired + private CarOneClickOperationMapper carOneClickOperationMapper; + @Autowired + private RabbitTemplate rabbitTemplate; + @Autowired + private RedisService redisService; + @Autowired + private StringRedisTemplate redisTemplate; + + /** + * 获取连接信息 + * + * @param vehicleConnectionReq 车辆连接请求参数 + * @return + */ + @Override + public Result getConnect(VehicleConnectionReq vehicleConnectionReq) { + log.info("车辆连接请求:{}", vehicleConnectionReq.toString()); + + VehicleConnection vehicleConnection = new VehicleConnection(); + //车辆vin + vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin()); + //用户名 + vehicleConnection.setUsername(vehicleConnectionReq.getUsername()); + //密码(vin+时间戳+随机数) + vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin() + vehicleConnectionReq.getTimestamp() + vehicleConnectionReq.getNonce()); + //查询有没有这辆车的vin码 + List selectVehicle = carOneClickOperationMapper.selectByVehicleVin(vehicleConnectionReq.getVehicleVin()); + + if (selectVehicle.isEmpty()) { + //添加连接信息 + carOneClickOperationMapper.addConnect(vehicleConnection); + log.info("车辆上线成功"); + } else { + throw new RuntimeException("车辆无法重复上线"); + + } + //先判断vin码 + if (redisService.hasKey(vehicleConnectionReq.getVehicleVin())) { + log.error("=============车辆:{}已经绑定过了", vehicleConnectionReq.getVehicleVin()); + throw new RuntimeException("=============车辆已经绑定过了"); + } + MqttProperties mqttProperties = new MqttProperties(); + List vehicleVin = selectByVehicleVin(vehicleConnectionReq.getVehicleVin()); + for (VehicleConnection connection : vehicleVin) { + mqttProperties.setClientId(connection.getVehicleVin()); + mqttProperties.setUserName(connection.getUsername()); + mqttProperties.setPassword(connection.getPassword()); + } + mqttProperties.setTopic("vehicle"); + mqttProperties.setQos(0); + //判断redis有没有count键 + if (redisTemplate.hasKey("oneCount")) { + //取出count + Integer count = Integer.valueOf(redisTemplate.opsForValue().get("oneCount")); + if (count == 1) { + redisTemplate.opsForValue().set("oneCount", String.valueOf(0)); + } else { + redisTemplate.opsForValue().set("oneCount", String.valueOf(count + 1)); + } + //根据游标count获取服务IP +// String ip = redisTemplate.opsForList().index("ipList", count); + Object ipList = redisService.redisTemplate.opsForList().index("oneIpList", count); + + log.info("=========================oneIpList:" + ipList); + //关联车辆和服务 + this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString())); + //响应信息 + log.info("车辆:{}", vehicleConnectionReq.getVehicleVin() + "绑定成功:{}", ipList); + mqttProperties.setBroker("tcp://" + ipList + ":1883"); + // 使用交换机发送消息 + rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties); + log.info("============================发送消息成功:{}", mqttProperties); + return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle")); + } else { + redisTemplate.opsForValue().set("oneCount", String.valueOf(0)); + //根据游标count获取服务器Ip + Object ipList = redisService.redisTemplate.opsForList().index("oneIpList", 0); + //关联车辆和服务 + this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString())); + //响应信息 + log.info("车辆:{}", vehicleConnectionReq.getVehicleVin(), "与:{}绑定成功", ipList); + mqttProperties.setBroker("tcp://" + ipList + ":1883"); + // 使用交换机发送消息 + rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties); + log.info("============================发送消息成功:{}", mqttProperties); + return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle")); + } + } + + /** + * 添加车辆绑定IP地址存入redis中 + */ + public void addIpAddress(VinIp vinIp) { + if (vinIp == null || vinIp.getVin() == null || vinIp.getVin().isEmpty() || vinIp.getIp() == null || vinIp.getIp().isEmpty()) { + throw new IllegalArgumentException("vin 或 ip 不能为空或无效"); + } + redisService.setCacheObject(vinIp.getVin(), vinIp.getIp()); + } + + /** + * 查询车辆绑定的服务器信息 + * + * @param vehicleVin 车辆vin码集合 + * @return + */ + public List selectByVehicleVin(String vehicleVin) { + return carOneClickOperationMapper.getMqttServerModel(vehicleVin); + } +} diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/HttpStatus.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/HttpStatus.java deleted file mode 100644 index 8665adb..0000000 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/HttpStatus.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.muyu.cargateway.domain; - -/** - * 返回状态码 - * - * @author ruoyi - */ -public class HttpStatus { - /** - * 操作成功 - */ - public static final int SUCCESS = 200; - - /** - * 对象创建成功 - */ - public static final int CREATED = 201; - - /** - * 请求已经被接受 - */ - public static final int ACCEPTED = 202; - - /** - * 操作已经执行成功,但是没有返回数据 - */ - public static final int NO_CONTENT = 204; - - /** - * 资源已被移除 - */ - public static final int MOVED_PERM = 301; - - /** - * 重定向 - */ - public static final int SEE_OTHER = 303; - - /** - * 资源没有被修改 - */ - public static final int NOT_MODIFIED = 304; - - /** - * 参数列表错误(缺少,格式不匹配) - */ - public static final int BAD_REQUEST = 400; - - /** - * 未授权 - */ - public static final int UNAUTHORIZED = 401; - - /** - * 访问受限,授权过期 - */ - public static final int FORBIDDEN = 403; - - /** - * 资源,服务未找到 - */ - public static final int NOT_FOUND = 404; - - /** - * 不允许的http方法 - */ - public static final int BAD_METHOD = 405; - - /** - * 资源冲突,或者资源被锁 - */ - public static final int CONFLICT = 409; - - /** - * 不支持的数据,媒体类型 - */ - public static final int UNSUPPORTED_TYPE = 415; - - /** - * 系统内部错误 - */ - public static final int ERROR = 500; - - /** - * 接口未实现 - */ - public static final int NOT_IMPLEMENTED = 501; - - /** - * 系统警告消息 - */ - public static final int WARN = 601; -} diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/model/TaskModel.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/model/TaskModel.java deleted file mode 100644 index f4ea58c..0000000 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/domain/model/TaskModel.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.muyu.cargateway.domain.model; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.extern.log4j.Log4j2; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @ Tool:IntelliJ IDEA - * @ Author:CHX - * @ Date:2024-09-26-20:23 - * @ Version:1.0 - * @ Description:任务执行模型 - * @author Lenovo - */ -@Data -@Log4j2 -@Builder -@AllArgsConstructor -@NoArgsConstructor -public class TaskModel { - /** - * 任务状态 默认为false状态 - * true为执行中,false为未执行 - */ - private final AtomicBoolean status =new AtomicBoolean(Boolean.FALSE); - /** - * 堵塞计数器 - */ - private CountDownLatch countDownLatch; - /** - * 任务执行堵塞队列 - */ - private LinkedBlockingDeque carQueue =new LinkedBlockingDeque<>(); - /** - * 任务是否执行 - * true 执行中 - * false 未执行 - * @return 是否有任务执行 - */ - private boolean isExecution(){ - return !status.get(); - } - /** - * 任务名称 - */ - private String taskName; - /** - * 任务执行次数 - */ - private Integer taskExecutionCount=0; - /** - * 任务开始时间 - */ - private Long taskStartTime; - /** - * 任务成功执行次数 - */ - private AtomicInteger taskSuccessSum=new AtomicInteger(); - /** - * 任务执行失败次数 - */ - private AtomicInteger taskErrorSum=new AtomicInteger(); - - /** - * 判断是否有任务 - * @return true 有任务 - */ - public boolean hashNext(){ - return !carQueue.isEmpty(); - } - - /** - * 获取下一个任务节点 - * @return 任务VIN - */ - public String next(){ - return carQueue.poll(); - } - -} diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java deleted file mode 100644 index 9c22503..0000000 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/service/Impl/CarOneClickOperationServiceImpl.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.muyu.cargateway.service.Impl; - -import com.muyu.common.rabbit.config.RabbitmqConfig; -import com.muyu.cargateway.domain.VehicleConnection; -import com.muyu.cargateway.domain.VinIp; -import com.muyu.cargateway.domain.model.MqttServerModel; -import com.muyu.cargateway.domain.req.VehicleConnectionReq; -import com.muyu.cargateway.mapper.CarOneClickOperationMapper; -import com.muyu.cargateway.service.CarOneClickOperationService; -import com.muyu.common.redis.service.RedisService; -import lombok.extern.log4j.Log4j2; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.List; - -/** - * @ Tool:IntelliJ IDEA - * @ Author:CHX - * @ Date:2024-09-26-20:16 - * @ Version:1.0 - * @ Description:车辆一键操作业务实现层 - * @author Lenovo - */ -@Log4j2 -@Service -public class CarOneClickOperationServiceImpl implements CarOneClickOperationService { - - @Autowired - private CarOneClickOperationMapper carOneClickOperationMapper; - - @Autowired - private RabbitTemplate rabbitTemplate; - @Autowired - private RedisService redisService; - - /** - * 获取连接信息 - * @param vehicleConnectionReq 车辆连接请求参数 - * @return - */ - @Override - public MqttServerModel getConnect(VehicleConnectionReq vehicleConnectionReq) { - log.info("车辆连接请求:{}",vehicleConnectionReq.toString()); - - // 使用交换机发送消息 - rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,RabbitmqConfig.ROUTINGKEY_EMAIL,vehicleConnectionReq.getVehicleVin()); - log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin()); - - - VehicleConnection vehicleConnection = new VehicleConnection(); - //车辆vin - vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin()); - //用户名 - vehicleConnection.setUsername(vehicleConnectionReq.getUsername()); - //密码(vin+时间戳+随机数) - vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin()+vehicleConnectionReq.getTimestamp()+vehicleConnectionReq.getNonce()); - //查询车辆vin集合 - List vehicleConnections =carOneClickOperationMapper.selectByVehicleVin(vehicleConnectionReq.getVehicleVin()); - if(vehicleConnections.isEmpty()){ - //添加 - carOneClickOperationMapper.addConnect(vehicleConnection); - } - log.info("该车辆已存在,不能重复预上线"); - //TODO 返回连接信息 做轮询操作 - - - return new MqttServerModel("tcp://"+"106.15.136.7"+":1883","vehicle"); - - } - /** - * 添加车辆绑定IP地址存入redis中 - */ - public void addIpAddress(VinIp vinIp){ - redisService.setCacheObject("vehicle_ip_address:"+vinIp.getVehicleVin(),vinIp.getIpAddress()); - } - -} diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/utils/ECSTool.java b/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/utils/ECSTool.java deleted file mode 100644 index f83fa50..0000000 --- a/cloud-modules/cloud-modules-car-gateway/src/main/java/com/muyu/cargateway/utils/ECSTool.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.muyu.cargateway.utils; - -import com.aliyun.ecs20140526.Client; -import com.aliyun.ecs20140526.models.DeleteInstanceRequest; -import com.aliyun.ecs20140526.models.DescribeInstancesRequest; -import com.aliyun.ecs20140526.models.DescribeInstancesResponse; -import com.aliyun.ecs20140526.models.RunInstancesRequest; -import com.aliyun.tea.TeaException; -import com.aliyun.teaopenapi.models.Config; -import com.aliyun.teautil.Common; -import com.aliyun.teautil.models.RuntimeOptions; -import lombok.extern.log4j.Log4j2; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @ Tool:IntelliJ IDEA - * @ Author:CHX - * @ Date:2024-10-02-16:04 - * @ Version:1.0 - * @ Description:ecs实例工具类 - * @author Lenovo - */ -@Log4j2 -public class ECSTool { - - public static final String ACCESS_KEY_ID = "LTAI5tDH3FyRx4PRr6anx2TL"; - public static final String ACCESS_KEY_SECRET = "xdQnX2tDattY50raNkUWmHzE2tondP"; - - public static Client createClient() throws Exception { - // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。 - Config config = new Config() - // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。 - .setAccessKeyId(ACCESS_KEY_ID) - // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 - .setAccessKeySecret(ACCESS_KEY_SECRET); - // Endpoint 请参考 https://api.aliyun.com/product/Ecs - config.endpoint = "ecs-cn-hangzhou.aliyuncs.com"; - return new Client(config); - } - public static void runEcsInstance(String regionId, String launchTemplateId) throws Exception { - Client client = ECSTool.createClient(); - RunInstancesRequest request = new RunInstancesRequest(); - request.setRegionId(regionId) - .setLaunchTemplateId(launchTemplateId); - RuntimeOptions runtimeOptions = new RuntimeOptions(); - try{ - client.runInstancesWithOptions(request, runtimeOptions); - }catch (Exception error){ - // 处理API调用过程中出现的异常 - System.out.println(error.getMessage()); - if (error instanceof TeaException) { - // 处理特定类型的异常,如TeaException - TeaException teaError = (TeaException) error; - // 打印诊断推荐链接 - System.out.println(teaError.getData().get("Recommend")); - // 断言错误信息 - Common.assertAsString(teaError.getMessage()); - } else { - // 处理其他类型的异常 - System.out.println(error.getMessage()); - } - } - } - - /** - *销毁实例 - */ - public static void runEcsRemove(String instanceId) throws Exception { - Client client = ECSTool.createClient(); - DeleteInstanceRequest deleteInstancesRequest = new DeleteInstanceRequest(); - deleteInstancesRequest.setInstanceId(instanceId); - RuntimeOptions runtimeOptions = new RuntimeOptions(); - - try { - // 复制代码运行请自行打印 API 的返回值 - client.deleteInstanceWithOptions(deleteInstancesRequest, runtimeOptions); - } catch (TeaException error) { - // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 - // 错误 message - System.out.println(error.getMessage()); - // 诊断地址 - System.out.println(error.getData().get("Recommend")); - Common.assertAsString(error.message); - } catch (Exception _error) { - TeaException error = new TeaException(_error.getMessage(), _error); - // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 - // 错误 message - System.out.println(error.getMessage()); - // 诊断地址 - System.out.println(error.getData().get("Recommend")); - Common.assertAsString(error.message); - } - } - - /** - * 查询实例列表 - * @param regionId 地域ID - */ - public static List findInstance(String regionId) throws Exception { - Client client = ECSTool.createClient(); - DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest(); - describeInstancesRequest.setRegionId(regionId); - RuntimeOptions runtimeOptions = new RuntimeOptions(); - List stringArrayList = new ArrayList<>(); - try { - DescribeInstancesResponse response = client.describeInstancesWithOptions(describeInstancesRequest, runtimeOptions); - List> ipListList = response.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList()); - for (List strings : ipListList) { - for (String ip : strings) { - stringArrayList.add(ip); - } - return stringArrayList; - } - } catch (TeaException error) { - // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 - // 错误 message - System.out.println(error.getMessage()); - // 诊断地址 - System.out.println(error.getData().get("Recommend")); - Common.assertAsString(error.message); - } catch (Exception _error) { - TeaException error = new TeaException(_error.getMessage(), _error); - // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 - // 错误 message - System.out.println(error.getMessage()); - // 诊断地址 - System.out.println(error.getData().get("Recommend")); - Common.assertAsString(error.message); - } - return null; - } -} diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml index 2b120b0..5acfde6 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-car-gateway/src/main/resources/bootstrap.yml @@ -1,13 +1,13 @@ # Tomcat server: - port: 12900 + port: 10099 # nacos线上地址 nacos: addr: 47.116.173.119:8848 user-name: nacos password: nacos - namespace: oneone + namespace: one # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: @@ -27,10 +27,10 @@ spring: publisher-returns: true #确认消息已发送到队列(Queue) amqp: deserialization: - trust: - all: true + trust: + all: true main: - allow-bean-definition-overriding: true + allow-bean-definition-overriding: true application: # 应用名称 @@ -82,4 +82,4 @@ aliyun: instance-type: ecs.t6-c1m1.large security-group-id: sg-uf642d5u4ja5gsiitx8y switch-id: vsw-uf66lifrkhxqc94xi06v3 - amount: 1 + amount: 2 diff --git a/cloud-modules/cloud-modules-car-gateway/src/main/resources/mapper/CarOneClickOperationMapper.xml b/cloud-modules/cloud-modules-car-gateway/src/main/resources/mapper/CarOneClickOperationMapper.xml index 814cfa2..782862d 100644 --- a/cloud-modules/cloud-modules-car-gateway/src/main/resources/mapper/CarOneClickOperationMapper.xml +++ b/cloud-modules/cloud-modules-car-gateway/src/main/resources/mapper/CarOneClickOperationMapper.xml @@ -1,18 +1,26 @@ - + PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> + insert into car_one_click_operation - (vehicle_vin,user_name,password) - values - (#{vehicleVin},#{username},#{password}) + (vin, user_name, password) + values (#{vehicleVin}, #{username}, #{password}) + diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/FormMessageConsumer.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/FormMessageConsumer.java deleted file mode 100644 index a188c25..0000000 --- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/FormMessageConsumer.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.muyu.parsing.consumer; - -import com.muyu.cargateway.domain.properties.MqttProperties; -import com.muyu.common.kafka.constants.KafkaConstants; -import com.muyu.parsing.domain.KafKaData; -import com.muyu.parsing.domain.SysCarMessage; -import com.muyu.parsing.manager.TaskManager; -import com.muyu.parsing.service.impl.SysCarMessageServiceImpl; -import com.rabbitmq.client.Channel; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.eclipse.paho.client.mqttv3.*; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * @ClassName FormMessageConsumer - * @Description 描述 - * @Author Chen - * @Date 2024/10/9 15:55 - */ -@Component -@Slf4j -public class FormMessageConsumer { - @Resource - private RedisTemplate redisTemplate; - - @Resource - private KafkaProducer kafkaProducer; - @Resource - private SysCarMessageServiceImpl sysCarMessageService; - private final static String FORM_QUEUE = "queue_inform_sms"; - - @RabbitListener(queuesToDeclare = {@Queue(FORM_QUEUE)}) - public void downline(MqttProperties mqttProperties, Message message, Channel channel) { - log.info("基础信息 {} 。。。", mqttProperties); - try { - // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId()); - MqttConnectOptions connOpts = new MqttConnectOptions(); - //用户名 - connOpts.setUserName(mqttProperties.getUserName()); - //密码 - connOpts.setPassword(mqttProperties.getPassword().toCharArray()); - connOpts.setCleanSession(true); - System.out.println("Connecting to broker: " + mqttProperties.getBroker()); - sampleClient.connect(connOpts); - sampleClient.subscribe(mqttProperties.getTopic(), 0); - sampleClient.setCallback(new MqttCallback() { - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { - log.error("连接丢失:{}", throwable.getMessage()); - } - - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) { - TaskManager manager = new TaskManager(10); - manager.execute(() -> processMessage(mqttMessage)); - } - - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - log.info("接收消息:{}", iMqttDeliveryToken); - } - }); - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); - } catch (MqttException | IOException me) { -// System.out.println("reason " + me.getReasonCode()); - System.out.println("msg " + me.getMessage()); - System.out.println("loc " + me.getLocalizedMessage()); - System.out.println("cause " + me.getCause()); - System.out.println("excep " + me); - me.printStackTrace(); - throw new RuntimeException(me); - } - } - - - private void processMessage(MqttMessage mqttMessage) { - String string = new String(mqttMessage.getPayload()); - log.info(string); - List list = sysCarMessageService.selectSysCarMessageLists(1); - List kafKaDataList = new ArrayList<>(); - String[] test = string.split(" "); - for (SysCarMessage carMessage : list) { - int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1; - int end = Integer.parseInt(carMessage.getMessageEndIndex()); - StringBuilder hexBuilder = new StringBuilder(); - for (int i = start; i < end; i++) { - hexBuilder.append(test[i]); - } - String hex = hexBuilder.toString(); - char[] result = new char[hex.length() / 2]; - for (int x = 0; x < hex.length(); x += 2) { - int high = Character.digit(hex.charAt(x), 16); - int low = Character.digit(hex.charAt(x + 1), 16); - result[x / 2] = (char) ((high << 4) + low); - } - String value = new String(result); - kafKaDataList.add(KafKaData.builder() - .key(carMessage.getMessageTypeCode()) - .label(carMessage.getMessageTypeCode()) - .value(value) - .type(carMessage.getMessageType()) - .build()); - } - kafKaDataList.add(KafKaData.builder() - .key("firmCode") - .label("企业编码") - .value("firm01") - .type("String") - .build()); - - String jsonString = com.alibaba.fastjson.JSONObject.toJSONString(kafKaDataList); - ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); - kafkaProducer.send(producerRecord); - log.info("kafka投产:{}", jsonString); - } -} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/RabbitListenerComponent.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/RabbitListenerComponent.java new file mode 100644 index 0000000..1d8ed7d --- /dev/null +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/consumer/RabbitListenerComponent.java @@ -0,0 +1,30 @@ +package com.muyu.parsing.consumer; + +import com.muyu.cargateway.domain.properties.MqttProperties; +import com.muyu.common.rabbit.constants.RabbitConstants; +import com.muyu.parsing.mqtt.service.MqttClientService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +@Component +@Slf4j +public class RabbitListenerComponent { + + @Autowired + private MqttClientService mqttClientService; + + @RabbitListener(queuesToDeclare = @Queue(value = RabbitConstants.FORM_QUEUE, durable = "true")) + public void downline(MqttProperties mqttProperties, Message message, Channel channel) { + try { + mqttClientService.connectAndSubscribeAsync(mqttProperties); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (Exception e) { + e.printStackTrace(); + log.error("处理RabbitMQ消息时发生错误", e); + } + } +} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/MessageProcessor.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/MessageProcessor.java new file mode 100644 index 0000000..99707c0 --- /dev/null +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/MessageProcessor.java @@ -0,0 +1,76 @@ +package com.muyu.parsing.manager; + +import com.alibaba.fastjson2.JSONObject; +import com.muyu.common.kafka.constants.KafkaConstants; +import com.muyu.parsing.domain.KafKaData; +import com.muyu.parsing.domain.SysCarMessage; +import com.muyu.parsing.service.impl.SysCarMessageServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Service +public class MessageProcessor { + + + private static final int ID = 1; + @Resource + private SysCarMessageServiceImpl sysCarMessageService; + @Resource + private KafkaProducer kafkaProducer; + + + public void processMessage(MqttMessage mqttMessage) { + String payload = new String(mqttMessage.getPayload()); + log.info("====:{}", payload); + + List carMessages = sysCarMessageService.selectSysCarMessageLists(ID); + List kafKaDataList = new ArrayList<>(); + + String[] test = payload.split(" "); + for (SysCarMessage carMessage : carMessages) { + int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1; + int end = Integer.parseInt(carMessage.getMessageEndIndex()); + + StringBuilder hexBuilder = new StringBuilder(); + for (int i = start; i < end; i++) { + hexBuilder.append(test[i]); + } + + String hex = hexBuilder.toString(); + char[] result = new char[hex.length() / 2]; + for (int x = 0; x < hex.length(); x += 2) { + int high = Character.digit(hex.charAt(x), 16); + int low = Character.digit(hex.charAt(x + 1), 16); + result[x / 2] = (char) ((high << 4) + low); + } + + String value = new String(result); + kafKaDataList.add(KafKaData.builder() + .key(carMessage.getMessageTypeCode()) + .label(carMessage.getMessageTypeCode()) + .value(value) + .type(carMessage.getMessageType()) + .build()); + } + + kafKaDataList.add(KafKaData.builder() + .key("firmCode") + .label("企业编码") + .value("firm01") + .type("String") + .build()); + String jsonString = JSONObject.toJSONString(kafKaDataList); + ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); + kafkaProducer.send(producerRecord); + log.info("kafka投产:{}", jsonString); + } +} + diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManager.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManager.java deleted file mode 100644 index b895ce6..0000000 --- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManager.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.muyu.parsing.manager; - -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; - -/** - * 线程配置 - */ -public final class TaskManager { - private final ExecutorService executorService; - - public TaskManager(int threadPoolSize) { - this.executorService = Executors.newFixedThreadPool(threadPoolSize); - } - - public void execute(Runnable task) { - executorService.submit(task); - } - - // 关闭线程池 - public void shutdown() { - executorService.shutdown(); - } -} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManagers.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManagers.java deleted file mode 100644 index 08fb605..0000000 --- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/TaskManagers.java +++ /dev/null @@ -1,189 +0,0 @@ -//package com.muyu.parsing.manager; -// -///** -// * @Author: 胡杨 -// * @Name: TaskManager -// * @Description: 任务管理器 -// * @CreatedDate: 2024/9/4 下午7:44 -// * @FilePath: com.muyu.quest.manager -// */ -// -// -//import lombok.extern.slf4j.Slf4j; -// -//import java.util.Collections; -//import java.util.LinkedList; -//import java.util.List; -// -//import static java.lang.Thread.sleep; -// -///** -// * 基础线程配置 -// */ -//@Slf4j -//public final class TaskManagers { -// // 线程池中默认线程的个数为5 -// private static int workerNum = 8; -// // 工作线程 -//// private final WorkThread[] workThrads; -// // 未处理的任务 -// private static volatile int finishedTask = 0; -//// -//// // 任务队列 -//// private final List taskQueue = new LinkedList(); -//// private static TaskManager taskManager; -//// -//// private long startTime; -//// private long endTime; -//// -//// // 创建具有默认线程个数的线程池 -//// private TaskManager() { -//// this(8); -//// } -//// -//// // 创建线程池,workerNum为线程池中工作线程的个数 -//// private TaskManager(int workerNum) { -//// taskManager.workerNum = workerNum; -//// workThrads = new WorkThread[workerNum]; -//// for (int i = 0; i < workerNum; i++) { -//// workThrads[i] = new WorkThread(); -//// workThrads[i].start();// 开启线程池中的线程 -//// } -//// startTime = System.currentTimeMillis(); -//// } -//// -//// // 单态模式,获得一个默认线程个数的线程池 -//// public static TaskManager getTaskManager() { -//// return getTaskManager(TaskManager.workerNum); -//// } -//// -//// // 单态模式,获得一个指定线程个数的线程池,workerNum(>0)为线程池中工作线程的个数 -//// // workerNum<=0创建默认的工作线程个数 -//// public static TaskManager getTaskManager(int workerNum1) { -//// if (workerNum1 <= 0) { -//// workerNum1 = TaskManager.workerNum; -//// } -//// if (taskManager == null) { -//// taskManager = new TaskManager(workerNum1); -//// } -//// return taskManager; -//// } -//// -//// // 把任务加入任务队列 -//// public void execute(List task) { -//// execute(task.toArray(new Runnable[0])); -//// } -//// -//// // 把任务加入任务队列 -//// public void execute(Runnable... task) { -//// synchronized (taskQueue) { -//// Collections.addAll(taskQueue, task); -//// taskQueue.notify(); -//// } -//// } -//// -//// -//// // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁 -//// public void destroy() { -//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧 -//// try { -//// sleep(500); -//// } catch (InterruptedException e) { -//// e.printStackTrace(); -//// } -//// } -//// // 工作线程停止工作,且置为null -//// for (int i = 0; i < workerNum; i++) { -//// workThrads[i].stopWorker(); -//// workThrads[i] = null; -//// } -//// taskManager = null; -//// taskQueue.clear();// 清空任务队列 -//// } -//// -//// // 任务完成后,统计线程池的运行情况 -//// public void closed() { -//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧 -//// try { -//// sleep(500); -//// } catch (InterruptedException e) { -//// e.printStackTrace(); -//// } -//// } -//// log.info("任务完成,线程状态: {}", this.toString()); -//// taskManager = null; -//// } -//// -//// // 返回工作线程的个数 -//// public int getWorkThreadNumber() { -//// return workerNum; -//// } -//// -//// // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成 -//// public int getFinishedTasknumber() { -//// return finishedTask; -//// } -//// -//// // 返回任务队列的长度,即还没处理的任务个数 -//// public int getWaitTasknumber() { -//// return taskQueue.size(); -//// } -//// -//// /** -//// * 返回线程池当前状态 -//// * -//// * @return 如果还在工作返回false,否则返回true -//// */ -//// public Boolean getIsRunning() { -//// return taskQueue.isEmpty() || (taskManager.getWaitTasknumber() == 0 && taskManager.getWorkThreadNumber() == 0); -//// } -//// -//// // 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数 -//// @Override -//// public String toString() { -//// endTime = System.currentTimeMillis(); -//// return "工作任务数:" + workerNum + ",已完成任务数:" -//// + finishedTask + ",等待任务数:" + getWaitTasknumber() -//// + "线程池作时长:" + (endTime - startTime) + "ms"; -//// } -//// -//// /** -//// * 内部类,工作线程 -//// */ -//// private class WorkThread extends Thread { -//// // 该工作线程是否有效,用于结束该工作线程 -//// private boolean isRunning = true; -//// -//// /* -//// * 关键所在,如果任务队列不空,则取出任务执行,若任务队列空,则等待 -//// */ -//// @Override -//// public void run() { -//// Runnable r = null; -//// while (isRunning) {// 若线程无效则自然结束run方法,该线程就没用了 -//// synchronized (taskQueue) { -//// while (isRunning && taskQueue.isEmpty()) {// 队列为空 -//// try { -//// taskQueue.wait(50); -//// } catch (InterruptedException e) { -//// e.printStackTrace(); -//// } -//// } -//// if (!taskQueue.isEmpty()) { -//// r = taskQueue.remove(0);// 取出任务 -//// } -//// } -//// if (r != null) { -//// r.run();// 执行任务 -//// } -//// finishedTask++; -//// r = null; -//// } -//// } -//// -//// // 停止工作,让该线程自然执行完run方法,自然结束 -//// public void stopWorker() { -//// isRunning = false; -//// } -//// } -//} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/MqttTest.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/MqttTest.java deleted file mode 100644 index 43dd7d8..0000000 --- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/MqttTest.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.muyu.parsing.mqtt; - -import com.alibaba.fastjson2.JSONObject; -import com.muyu.common.kafka.constants.KafkaConstants; -import com.muyu.parsing.domain.KafKaData; -import com.muyu.parsing.domain.SysCarMessage; -import com.muyu.parsing.service.impl.SysCarMessageServiceImpl; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.eclipse.paho.client.mqttv3.*; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; - - -/** - * mqtt - * - * @ClassName MqttTest - * @Description 描述 - * @Author Chen - * @Date 2024/9/28 23:49 - */ -@Slf4j -@Component -public class MqttTest { - private static final Integer ID = 1; - private static final Integer CODE = 1; - - @Resource - private KafkaProducer kafkaProducer; - @Resource - private SysCarMessageServiceImpl sysCarMessageService; - - @PostConstruct - public void Test() { - String topic = "vehicle"; - String content = "Message from MqttPublishSample"; - int qos = 2; - String broker = "tcp://106.15.136.7:1883"; - String clientId = "JavaSample"; - - try { - // 第三个参数为空,默认持久化策略 - MqttClient sampleClient = new MqttClient(broker, clientId); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - System.out.println("Connecting to broker: " + broker); - sampleClient.connect(connOpts); - sampleClient.subscribe(topic, 0); - sampleClient.setCallback(new MqttCallback() { - // 连接丢失 - @Override - public void connectionLost(Throwable throwable) { - - } - - // 连接成功 - @Override - public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { - List list = sysCarMessageService.selectSysCarMessageLists(ID); - String string = new String(mqttMessage.getPayload()); - log.info(new String(mqttMessage.getPayload())); - List kafKaDataList = new ArrayList<>(); - String[] test = string.split(" "); -// String[] results = new String[list.size()]; - for (SysCarMessage carMessage : list) { - int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1; - int end = Integer.parseInt(carMessage.getMessageEndIndex()); - StringBuilder hexBuilder = new StringBuilder(); - for (int i = start; i < end; i++) { - hexBuilder.append(test[i]); - } - String hex = hexBuilder.toString(); - char[] result = new char[hex.length() / 2]; - for (int x = 0; x < hex.length(); x += 2) { - int high = Character.digit(hex.charAt(x), 16); - int low = Character.digit(hex.charAt(x + 1), 16); - result[x / 2] = (char) ((high << 4) + low); - } - String value = new String(result); - kafKaDataList.add(KafKaData.builder() - .key(carMessage.getMessageTypeCode()) - .label(carMessage.getMessageTypeCode()) - .value(value) - .type(carMessage.getMessageType()) - .build()); - } - kafKaDataList.add(KafKaData.builder() - .key("firmCode") - .label("企业编码") - .value("firm01") - .type("String") - .build()); - String jsonString = JSONObject.toJSONString(kafKaDataList); - - ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); - kafkaProducer.send(producerRecord); - log.info("kafka投产:{}", jsonString); -// HashMap stringStringHashMap = new HashMap<>(); -// kafKaDataList.forEach(data -> stringStringHashMap.put(data.getKey(), data.getValue())); -// jsonString = JSONObject.toJSONString(stringStringHashMap); -// System.out.println(jsonString); - } - - - // 接收信息 - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - - } - - }); - } catch (MqttException me) { - System.out.println("reason " + me.getReasonCode()); - System.out.println("msg " + me.getMessage()); - System.out.println("loc " + me.getLocalizedMessage()); - System.out.println("cause " + me.getCause()); - System.out.println("excep " + me); - me.printStackTrace(); - } - } -} diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java new file mode 100644 index 0000000..af6a951 --- /dev/null +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java @@ -0,0 +1,80 @@ +package com.muyu.parsing.mqtt.service; + +import com.muyu.cargateway.domain.properties.MqttProperties; +import com.muyu.parsing.manager.MessageProcessor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Service +@Slf4j +public class MqttClientService { + + @Autowired + private MessageProcessor messageProcessor; + + private final ExecutorService executorService = Executors.newCachedThreadPool(); + private MqttClient sampleClient; + + + public void connectAndSubscribeAsync(MqttProperties mqttProperties) { + executorService.submit(() -> { + try { + connectAndSubscribe(mqttProperties); + } catch (MqttException | IOException e) { + log.error("MQTT连接或订阅失败", e); + } + }); + } + + private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException { +// if (sampleClient != null && sampleClient.isConnected()) { +// log.info("MQTT客户端已经连接,跳过重新连接。"); +// return; +// } + + sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId()); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setUserName(mqttProperties.getUserName()); + connOpts.setPassword(mqttProperties.getPassword().toCharArray()); + connOpts.setCleanSession(true); + + sampleClient.connect(connOpts); + sampleClient.subscribe(mqttProperties.getTopic(), 0); + + sampleClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable throwable) { + log.error("连接丢失:{}", throwable.getMessage()); + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) { + executorService.submit(() -> messageProcessor.processMessage(mqttMessage)); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("消息发送完成:{}", iMqttDeliveryToken); + } + }); + } + + @PreDestroy + public void shutdown() { + executorService.shutdown(); + if (sampleClient != null && sampleClient.isConnected()) { + try { + sampleClient.disconnect(); + } catch (MqttException e) { + log.error("MQTT客户端断开连接失败", e); + } + } + } +}