diff --git a/cloud-modules/cloud-modules-vehicle-gateway/pom.xml b/cloud-modules/cloud-modules-vehicle-gateway/pom.xml index 2e10df2..1147547 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/pom.xml +++ b/cloud-modules/cloud-modules-vehicle-gateway/pom.xml @@ -67,10 +67,10 @@ - - com.muyu - cloud-common-log - + + + + diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java index f15b318..f9be95b 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/ecs/Sample.java @@ -28,8 +28,8 @@ public class Sample implements ApplicationRunner{ @Autowired private AliProperties aliProperties; -// @Autowired -// private RedisService redisService; + @Autowired + private RedisService redisService; @Override public void run(ApplicationArguments args) throws Exception { @@ -42,7 +42,7 @@ public class Sample implements ApplicationRunner{ throw new RuntimeException(e); } log.info("创建实例成功"); -// redisService.setCacheList("instanceIds",list); + redisService.setCacheList("instanceIds",list); try{ Thread.sleep(6000); } catch (InterruptedException e) { @@ -51,7 +51,7 @@ public class Sample implements ApplicationRunner{ List aliInstances = aliYunEcsService.selectInstance(list); log.info("查询实例信息成功:{}",aliInstances); //将查询到的实例信息列表存储到redis中 -// redisService.setCacheList("instanceList",aliInstances); + redisService.setCacheList("instanceList",aliInstances); log.info("redis存储成功:{}",aliInstances); } } diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/AliYunEcsService.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/AliYunEcsService.java index 8fbd658..d1d33be 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/AliYunEcsService.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/aliyun/service/AliYunEcsService.java @@ -34,8 +34,8 @@ public class AliYunEcsService { /** * redis缓存 */ -// @Autowired -// private RedisService redisService; + @Autowired + private RedisService redisService; /** * 生成实例 @@ -44,8 +44,8 @@ public class AliYunEcsService { * @return 实例id集合 */ public List generateInstance(Integer amount) { -// redisService.deleteObject("instanceIds"); -// redisService.deleteObject("instanceList"); + redisService.deleteObject("instanceIds"); + redisService.deleteObject("instanceList"); // 检查生成实例的数量是否有效 if (amount == null || amount <= 0) { throw new ServiceException("生成数量不能小于1"); diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/controller/VehicleConnectionController.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/controller/VehicleConnectionController.java index d3602bf..c014662 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/controller/VehicleConnectionController.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/controller/VehicleConnectionController.java @@ -1,9 +1,10 @@ package com.muyu.cloud.vehicle.gateway.controller; -import com.alibaba.nacos.api.model.v2.Result; +import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel; import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq; import com.muyu.cloud.vehicle.gateway.service.VehicleConnectionService; +import com.muyu.common.core.domain.Result; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; @@ -32,9 +33,9 @@ public class VehicleConnectionController { * @return */ @PostMapping("/receiveMsg/connect") - public Result receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq){ - log.info("=======>"+vehicleConnectionReq); - vehicleConnectionService.getConnect(vehicleConnectionReq); - return Result.success(); + public Result receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq){ + log.info(">"+vehicleConnectionReq); + return vehicleConnectionService.getConnect(vehicleConnectionReq); + } } diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/model/MqttServerModel.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/model/MqttServerModel.java new file mode 100644 index 0000000..c179341 --- /dev/null +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/model/MqttServerModel.java @@ -0,0 +1,24 @@ +package com.muyu.cloud.vehicle.gateway.domain.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ Description:Mqtt服务模型 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class MqttServerModel { + /** + * Mqtt服务节点 + */ + private String broker; + /** + * MQTT订阅主题 + */ + private String topic; +} diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/model/TaskModel.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/model/TaskModel.java new file mode 100644 index 0000000..eb354d3 --- /dev/null +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/model/TaskModel.java @@ -0,0 +1,82 @@ +package com.muyu.cloud.vehicle.gateway.domain.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @ Description:任务执行模型 + */ +@Data +@Log4j2 +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class TaskModel { + /** + * 任务状态 默认为false状态 + * true为执行中,false为未执行 + */ + private final AtomicBoolean status =new AtomicBoolean(Boolean.FALSE); + /** + * 堵塞计数器 + */ + private CountDownLatch countDownLatch; + /** + * 任务执行堵塞队列 + */ + private LinkedBlockingDeque carQueue =new LinkedBlockingDeque<>(); + /** + * 任务是否执行 + * true 执行中 + * false 未执行 + * @return 是否有任务执行 + */ + private boolean isExecution(){ + return !status.get(); + } + /** + * 任务名称 + */ + private String taskName; + /** + * 任务执行次数 + */ + private Integer taskExecutionCount=0; + /** + * 任务开始时间 + */ + private Long taskStartTime; + /** + * 任务成功执行次数 + */ + private AtomicInteger taskSuccessSum=new AtomicInteger(); + /** + * 任务执行失败次数 + */ + private AtomicInteger taskErrorSum=new AtomicInteger(); + + /** + * 判断是否有任务 + * @return true 有任务 + */ + public boolean hashNext(){ + return !carQueue.isEmpty(); + } + + /** + * 获取下一个任务节点 + * @return 任务VIN + */ + public String next(){ + return carQueue.poll(); + } + +} diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/properties/MqttProperties.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/properties/MqttProperties.java new file mode 100644 index 0000000..7f1aaba --- /dev/null +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/properties/MqttProperties.java @@ -0,0 +1,40 @@ +package com.muyu.cloud.vehicle.gateway.domain.properties; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ Description:Mqtt的配置 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttProperties { + /** + * 节点 + */ + private String broker; + /** + * 主题 + */ + private String topic; + /** + * 用户名 + */ + private String userName; + /** + * 密码 + */ + private String password; + /** + * 客户端id + */ + private String clientId; + /** + * 上报级别 + */ + private int qos = 0; +} diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/resp/AliServerConfig.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/resp/AliServerConfig.java new file mode 100644 index 0000000..463be44 --- /dev/null +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/domain/resp/AliServerConfig.java @@ -0,0 +1,32 @@ +package com.muyu.cloud.vehicle.gateway.domain.resp; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @ Description:调用Ali服务器配置实体类 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class AliServerConfig { + /** + * 主键 + */ + private Long id; + /** + * 地域id (实例所属的地域ID) + */ + private String regionId; + /** + * 镜像id + */ + private String imageId; + /** + * 实例规格 (实例的资源规格) + */ + private String instanceType; +} diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/VehicleConnectionService.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/VehicleConnectionService.java index b75af93..ea07769 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/VehicleConnectionService.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/VehicleConnectionService.java @@ -1,12 +1,15 @@ package com.muyu.cloud.vehicle.gateway.service; +import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel; import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq; +import com.muyu.common.core.domain.Result; public interface VehicleConnectionService { /** * 获取连接 - * @param vehicleConnectionReq + * @param vehicleConnectionReq 车辆连接请求参数 + * @return */ - void getConnect(VehicleConnectionReq vehicleConnectionReq); + Result getConnect(VehicleConnectionReq vehicleConnectionReq); } diff --git a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java index 6259560..18679a6 100644 --- a/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java +++ b/cloud-modules/cloud-modules-vehicle-gateway/src/main/java/com/muyu/cloud/vehicle/gateway/service/impl/VehicleConnectionServiceImpl.java @@ -1,12 +1,17 @@ package com.muyu.cloud.vehicle.gateway.service.impl; import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection; +import com.muyu.cloud.vehicle.gateway.domain.VinIp; +import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel; import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq; import com.muyu.cloud.vehicle.gateway.mapper.VehicleConnectionMapper; import com.muyu.cloud.vehicle.gateway.service.VehicleConnectionService; +import com.muyu.common.core.domain.Result; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.HashOperations; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @Log4j2 @@ -19,17 +24,75 @@ public class VehicleConnectionServiceImpl implements VehicleConnectionService { @Autowired private VehicleConnectionMapper vehicleConnectionMapper; - @Override - public void getConnect(VehicleConnectionReq vehicleConnectionReq) { - log.info("车辆连接请求:{}",vehicleConnectionReq.toString()); + @Autowired + private StringRedisTemplate redisTemplate; + + /** + * 获取连接信息 + * @param vehicleConnectionReq 车辆连接请求参数 + * @return + */ + @Override + public Result getConnect(VehicleConnectionReq vehicleConnectionReq) { + log.info("车辆连接请求:{}",vehicleConnectionReq.toString()); + + // 使用交换机发送消息 + rabbitTemplate.convertAndSend("exchange_topics_inform","inform.#.email.#",vehicleConnectionReq.getVehicleVin()); + log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin()); - //发送消息 - rabbitTemplate.convertAndSend("exchange_topics_inform","",vehicleConnectionReq.getVehicleVin()); VehicleConnection vehicleConnection = new VehicleConnection(); + //车辆vin vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin()); + //用户名 vehicleConnection.setUsername(vehicleConnectionReq.getUsername()); + //密码(vin+时间戳+随机数) vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin()+vehicleConnectionReq.getTimestamp()+vehicleConnectionReq.getNonce()); + //添加连接信息 vehicleConnectionMapper.addConnect(vehicleConnection); + //先判断vin码 + HashOperations hashOps = redisTemplate.opsForHash(); + String vinIp = hashOps.get("vinIp", vehicleConnectionReq.getVehicleVin()); + if(vinIp!=null){ + log.info("车辆绑定ip失败,已经存在"); + throw new RuntimeException("车辆绑定ip失败,已经存在"); + } + + //判断redis有没有count键 + if(redisTemplate.hasKey("count")){ + //取出count + Integer count = Integer.valueOf(redisTemplate.opsForValue().get("count")); + if(count == 1){ + redisTemplate.opsForValue().set("count",String.valueOf(0)); + }else { + redisTemplate.opsForValue().set("count",String.valueOf(count+1)); + } + //根据游标count获取服务IP + String ip = redisTemplate.opsForList().index("ipList", count); + //关联车辆和服务 + this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(),ip)); + //响应信息 + log.info("车辆:{}",vehicleConnectionReq.getVehicleVin()+"绑定成功:{}",ip); + return Result.success(new MqttServerModel("tcp://"+ip+":1883","vehicle")); + + }else { + redisTemplate.opsForValue().set("count",String.valueOf(0)); + //根据游标count获取服务器Ip + String ip = redisTemplate.opsForList().index("ipList", 0); + //关联车辆和服务 + this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(),ip)); + //响应信息 + log.info("车辆:{}",vehicleConnectionReq.getVehicleVin(),"与:{}绑定成功",ip); + return Result.success(new MqttServerModel("tcp://"+ip+":1883","vehicle")); + } + } + /** + * 添加车辆绑定IP地址存入redis中 + */ + public void addIpAddress(VinIp vinIp) { + if (vinIp == null || vinIp.getVin() == null || vinIp.getVin().isEmpty() || vinIp.getIp() == null || vinIp.getIp().isEmpty()) { + throw new IllegalArgumentException("vin 或 ip 不能为空或无效"); + } + redisTemplate.opsForHash().put("vinIp", vinIp.getVin(), vinIp.getIp()); } }