feat(): 优化创建实例将查询到的实例ID存储到redis,优化启动车辆,将车辆信息发送到rabbitmq中
parent
ed3830c1ae
commit
08d79266ed
|
@ -16,7 +16,7 @@ public class DeleteSample implements DisposableBean {
|
||||||
@Autowired
|
@Autowired
|
||||||
private AliYunEcsService aliYunEcsService;
|
private AliYunEcsService aliYunEcsService;
|
||||||
@Override
|
@Override
|
||||||
public void destroy() throws Exception {
|
public void destroy() {
|
||||||
try{
|
try{
|
||||||
log.info("==========开始执行删除实例方法");
|
log.info("==========开始执行删除实例方法");
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
|
|
|
@ -28,8 +28,6 @@ public class Sample implements ApplicationRunner{
|
||||||
@Autowired
|
@Autowired
|
||||||
private AliProperties aliProperties;
|
private AliProperties aliProperties;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RedisService redisService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(ApplicationArguments args) throws Exception {
|
public void run(ApplicationArguments args) throws Exception {
|
||||||
|
@ -42,16 +40,16 @@ public class Sample implements ApplicationRunner{
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
log.info("创建实例成功");
|
log.info("创建实例成功");
|
||||||
redisService.setCacheList("instanceIds",list);
|
// redisService.setCacheList("instanceIds",list);
|
||||||
try{
|
try{
|
||||||
Thread.sleep(6000);
|
Thread.sleep(9000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
List<AliInstance> aliInstances = aliYunEcsService.selectInstance(list);
|
List<AliInstance> aliInstances = aliYunEcsService.selectInstance(list);
|
||||||
log.info("查询实例信息成功:{}",aliInstances);
|
log.info("查询实例信息成功:{}",aliInstances);
|
||||||
//将查询到的实例信息列表存储到redis中
|
//将查询到的实例信息列表存储到redis中
|
||||||
redisService.setCacheList("instanceList",aliInstances);
|
// redisService.setCacheList("instanceList",aliInstances);
|
||||||
log.info("redis存储成功:{}",aliInstances);
|
// log.info("redis存储成功:{}",aliInstances);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,9 +44,9 @@ public class AliYunEcsService {
|
||||||
* @return 实例id集合
|
* @return 实例id集合
|
||||||
*/
|
*/
|
||||||
public List<String> generateInstance(Integer amount) {
|
public List<String> generateInstance(Integer amount) {
|
||||||
redisService.deleteObject("instanceIds");
|
redisService.deleteObject("oneIpList");
|
||||||
redisService.deleteObject("instanceList");
|
redisService.deleteObject("oneCount");
|
||||||
redisService.deleteObject("count");
|
redisService.deleteObject("oneVinIp");
|
||||||
// 检查生成实例的数量是否有效
|
// 检查生成实例的数量是否有效
|
||||||
if (amount == null || amount <= 0) {
|
if (amount == null || amount <= 0) {
|
||||||
throw new ServiceException("生成数量不能小于1");
|
throw new ServiceException("生成数量不能小于1");
|
||||||
|
@ -143,9 +143,11 @@ public class AliYunEcsService {
|
||||||
log.info("实例状态为:{}", status);
|
log.info("实例状态为:{}", status);
|
||||||
AliInstance aliInstance = new AliInstance(instanceId, ipAddress, status);
|
AliInstance aliInstance = new AliInstance(instanceId, ipAddress, status);
|
||||||
aliInstances.add(aliInstance);
|
aliInstances.add(aliInstance);
|
||||||
|
redisService.setCacheList(instanceId,aliInstances);
|
||||||
|
aliInstances.remove(aliInstance);
|
||||||
}
|
}
|
||||||
log.info("======================ipList:{}",stringArrayList);
|
log.info("======================ipList:{}",stringArrayList);
|
||||||
redisService.setCacheList("ipList",stringArrayList);
|
redisService.setCacheList("oneIpList",stringArrayList);
|
||||||
log.info("查询成功");
|
log.info("查询成功");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("查询服务器实例错误:[{}]", e.getMessage(), e);
|
log.error("查询服务器实例错误:[{}]", e.getMessage(), e);
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.muyu.cloud.vehicle.gateway.config;
|
package com.muyu.cloud.vehicle.gateway.config;
|
||||||
|
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.amqp.core.*;
|
import org.springframework.amqp.core.*;
|
||||||
|
@ -12,93 +13,174 @@ import java.util.concurrent.Exchanger;
|
||||||
/**
|
/**
|
||||||
* rabbitmq配置类
|
* rabbitmq配置类
|
||||||
*/
|
*/
|
||||||
|
@Log4j2
|
||||||
@Configuration
|
@Configuration
|
||||||
public class RabbitmqConfig {
|
public class RabbitmqConfig {
|
||||||
/**
|
/**
|
||||||
* 日志
|
* 日志
|
||||||
*/
|
*/
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);
|
private static final Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);
|
||||||
/**
|
|
||||||
* 队列
|
|
||||||
*/
|
|
||||||
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
|
|
||||||
/**
|
|
||||||
* 队列
|
|
||||||
*/
|
|
||||||
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
|
|
||||||
/**
|
/**
|
||||||
* 交换机
|
* 交换机
|
||||||
*/
|
*/
|
||||||
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
|
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
|
||||||
/**
|
|
||||||
* 路由key
|
|
||||||
*/
|
|
||||||
private static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
|
|
||||||
/**
|
|
||||||
* 路由key
|
|
||||||
*/
|
|
||||||
private static final String ROUTINGKEY_SMS = "inform.#.sms.#";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 声明交换机,做持久化
|
* 队列 车辆上线给事件系统发送vin
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
|
||||||
|
/**
|
||||||
|
* 队列 协议解析
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
|
||||||
|
/**
|
||||||
|
* 队列 车辆下线给事件系统发送vin
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_SEND = "queue_inform_send";
|
||||||
|
/**
|
||||||
|
* 队列 saas系统
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_SAAS = "queue_inform_saas";
|
||||||
|
/**
|
||||||
|
* 路由key 车辆上线给事件系统
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
|
||||||
|
/**
|
||||||
|
* 路由key 协议解析
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
|
||||||
|
/**
|
||||||
|
* 路由key 车辆下线给事件系统
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_SEND = "inform.#.send.#";
|
||||||
|
/**
|
||||||
|
* 路由key saas系统
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_SAAS = "inform.#.saas.#";
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明交换机,做持久化
|
||||||
*/
|
*/
|
||||||
@Bean(EXCHANGE_TOPICS_INFORM)
|
@Bean(EXCHANGE_TOPICS_INFORM)
|
||||||
public Exchange exchangeTopicsInform() {
|
public Exchange exchangeTopicsInform() {
|
||||||
try{
|
try {
|
||||||
Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
|
Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
|
||||||
logger.info("创建的交换机为:{}",EXCHANGE_TOPICS_INFORM);
|
log.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM);
|
||||||
return exchange;
|
return exchange;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("创建该:{} 交换机失败",EXCHANGE_TOPICS_INFORM,e);
|
log.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 声明QUEUE_INFORM_EMAIL队列
|
* 声明QUEUE_INFORM_EMAIL 队列
|
||||||
*/
|
*/
|
||||||
@Bean(QUEUE_INFORM_EMAIL)
|
@Bean(QUEUE_INFORM_EMAIL)
|
||||||
public Queue queueInformEmail() {
|
public Queue queueInformEmail() {
|
||||||
try{
|
try {
|
||||||
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
|
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
|
||||||
logger.info("创建的队列为:{}",QUEUE_INFORM_EMAIL);
|
log.info("创建的队列为: {}", QUEUE_INFORM_EMAIL);
|
||||||
return queue;
|
return queue;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("创建该:{} 队列失败",QUEUE_INFORM_EMAIL,e);
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 声明QUEUE_INFORM_SMS队列
|
* 声明QUEUE_INFORM_SMS 队列
|
||||||
*/
|
*/
|
||||||
@Bean(QUEUE_INFORM_SMS)
|
@Bean(QUEUE_INFORM_SMS)
|
||||||
public Queue queueInformSms() {
|
public Queue queueInformSms() {
|
||||||
try{
|
try {
|
||||||
Queue queue = new Queue(QUEUE_INFORM_SMS);
|
Queue queue = new Queue(QUEUE_INFORM_SMS);
|
||||||
logger.info("创建的队列为:{}",QUEUE_INFORM_SMS);
|
log.info("创建的队列为: {}", QUEUE_INFORM_SMS);
|
||||||
return queue;
|
return queue;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("创建该:{} 队列失败",QUEUE_INFORM_SMS,e);
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
|
* QUEUE_INFORM_SEND 队列
|
||||||
|
*/
|
||||||
|
@Bean(QUEUE_INFORM_SEND)
|
||||||
|
public Queue queueInformSend() {
|
||||||
|
try {
|
||||||
|
Queue queue = new Queue(QUEUE_INFORM_SEND);
|
||||||
|
log.info("创建的队列为: {}", QUEUE_INFORM_SEND);
|
||||||
|
return queue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_SEND, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SAAS 队列
|
||||||
|
*/
|
||||||
|
@Bean(QUEUE_INFORM_SAAS)
|
||||||
|
public Queue queueInformSaas() {
|
||||||
|
try {
|
||||||
|
Queue queue = new Queue(QUEUE_INFORM_SAAS);
|
||||||
|
log.info("创建的队列为: {}", QUEUE_INFORM_SAAS);
|
||||||
|
return queue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_SAAS, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_EMAIL队列绑定交换机,指定routingKey ROUTINGKEY_EMAIL
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_EMAIL
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Binding bindingExchangeInformEmail(@Qualifier(QUEUE_INFORM_EMAIL)Queue queue,
|
public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
|
||||||
@Qualifier(EXCHANGE_TOPICS_INFORM)Exchange exchange) {
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ROUTINGKEY_SMS队列绑定交换机,指定routingKey
|
* QUEUE_INFORM_SMS 队列绑定交换机,指定routingKey ROUTINGKEY_SMS
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_SMS
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Binding bindingExchangeInformSms(@Qualifier(QUEUE_INFORM_SMS)Queue queue,
|
public Binding bindingRoutingKeySms(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
|
||||||
@Qualifier(EXCHANGE_TOPICS_INFORM)Exchange exchange) {
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SEND队列绑定交换机,指定routingKey ROUTINGKEY_SEND
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_SEND
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding bindingRoutingKeySend(@Qualifier(QUEUE_INFORM_SEND) Queue queue,
|
||||||
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SEND).noargs();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SAAS队列绑定交换机,指定routingKey ROUTINGKEY_SAAS
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_SAAS
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding bindingRoutingKeySaas(@Qualifier(QUEUE_INFORM_SAAS) Queue queue,
|
||||||
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SAAS).noargs();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,7 @@ public class VehicleConnectionController {
|
||||||
*/
|
*/
|
||||||
@PostMapping("/receiveMsg/connect")
|
@PostMapping("/receiveMsg/connect")
|
||||||
public Result<MqttServerModel> receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq){
|
public Result<MqttServerModel> receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq){
|
||||||
log.info(">"+vehicleConnectionReq);
|
log.info("===============>"+vehicleConnectionReq);
|
||||||
vehicleConnectionService.getConnect(vehicleConnectionReq);
|
return vehicleConnectionService.getConnect(vehicleConnectionReq);
|
||||||
return Result.success();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,93 +0,0 @@
|
||||||
package com.muyu.cloud.vehicle.gateway.domain;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 返回状态码
|
|
||||||
*/
|
|
||||||
public class HttpStatus {
|
|
||||||
/**
|
|
||||||
* 操作成功
|
|
||||||
*/
|
|
||||||
private static final int SUCCESS = 200;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 对象创建成功
|
|
||||||
*/
|
|
||||||
public static final int CREATED = 201;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 请求已经被接受
|
|
||||||
*/
|
|
||||||
public static final int ACCEPTED = 202;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 操作已经执行成功
|
|
||||||
*/
|
|
||||||
public static final int NO_CONTENT = 204;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源已被修改
|
|
||||||
*/
|
|
||||||
public static final int MOVED_PERM = 301;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 重定向
|
|
||||||
*/
|
|
||||||
public static final int SEE_OTHER = 303;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源没有被修改
|
|
||||||
*/
|
|
||||||
public static final int NOT_MODIFIED = 304;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参数列表错误(缺少,格式不匹配)
|
|
||||||
*/
|
|
||||||
public static final int BAD_REQUEST = 400;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 未授权
|
|
||||||
*/
|
|
||||||
public static final int UNAUTHORIZED = 401;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 访问受限,授权过期
|
|
||||||
*/
|
|
||||||
public static final int FORBIDDEN = 403;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源,服务未找到
|
|
||||||
*/
|
|
||||||
public static final int NOT_FOUND = 404;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 不允许的http方法
|
|
||||||
*/
|
|
||||||
public static final int BAD_METHOD = 405;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源冲突,或者资源被锁
|
|
||||||
*/
|
|
||||||
public static final int CONFLICT = 409;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 不支持的数据,媒体类型
|
|
||||||
*/
|
|
||||||
public static final int UNSUPPORTED_TYPE = 415;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 系统内部错误
|
|
||||||
*/
|
|
||||||
public static final int ERROR = 500;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 接口未实现
|
|
||||||
*/
|
|
||||||
public static final int NOT_IMPLEMENTED = 501;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 系统警告消息
|
|
||||||
*/
|
|
||||||
public static final int WARN = 601;
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,17 +0,0 @@
|
||||||
package com.muyu.cloud.vehicle.gateway.domain;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
public class Instance {
|
|
||||||
|
|
||||||
private String instanceId;
|
|
||||||
|
|
||||||
private String ipAddress;
|
|
||||||
|
|
||||||
private String status;
|
|
||||||
}
|
|
|
@ -1,82 +0,0 @@
|
||||||
package com.muyu.cloud.vehicle.gateway.domain.model;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @ Description:任务执行模型
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@Log4j2
|
|
||||||
@Builder
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
public class TaskModel {
|
|
||||||
/**
|
|
||||||
* 任务状态 默认为false状态
|
|
||||||
* true为执行中,false为未执行
|
|
||||||
*/
|
|
||||||
private final AtomicBoolean status =new AtomicBoolean(Boolean.FALSE);
|
|
||||||
/**
|
|
||||||
* 堵塞计数器
|
|
||||||
*/
|
|
||||||
private CountDownLatch countDownLatch;
|
|
||||||
/**
|
|
||||||
* 任务执行堵塞队列
|
|
||||||
*/
|
|
||||||
private LinkedBlockingDeque<String> carQueue =new LinkedBlockingDeque<>();
|
|
||||||
/**
|
|
||||||
* 任务是否执行
|
|
||||||
* true 执行中
|
|
||||||
* false 未执行
|
|
||||||
* @return 是否有任务执行
|
|
||||||
*/
|
|
||||||
private boolean isExecution(){
|
|
||||||
return !status.get();
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* 任务名称
|
|
||||||
*/
|
|
||||||
private String taskName;
|
|
||||||
/**
|
|
||||||
* 任务执行次数
|
|
||||||
*/
|
|
||||||
private Integer taskExecutionCount=0;
|
|
||||||
/**
|
|
||||||
* 任务开始时间
|
|
||||||
*/
|
|
||||||
private Long taskStartTime;
|
|
||||||
/**
|
|
||||||
* 任务成功执行次数
|
|
||||||
*/
|
|
||||||
private AtomicInteger taskSuccessSum=new AtomicInteger();
|
|
||||||
/**
|
|
||||||
* 任务执行失败次数
|
|
||||||
*/
|
|
||||||
private AtomicInteger taskErrorSum=new AtomicInteger();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 判断是否有任务
|
|
||||||
* @return true 有任务
|
|
||||||
*/
|
|
||||||
public boolean hashNext(){
|
|
||||||
return !carQueue.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取下一个任务节点
|
|
||||||
* @return 任务VIN
|
|
||||||
*/
|
|
||||||
public String next(){
|
|
||||||
return carQueue.poll();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -5,6 +5,8 @@ import lombok.Builder;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @ Description:Mqtt的配置
|
* @ Description:Mqtt的配置
|
||||||
*/
|
*/
|
||||||
|
@ -12,7 +14,7 @@ import lombok.NoArgsConstructor;
|
||||||
@Builder
|
@Builder
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class MqttProperties {
|
public class MqttProperties implements Serializable {
|
||||||
/**
|
/**
|
||||||
* 节点
|
* 节点
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -3,7 +3,13 @@ package com.muyu.cloud.vehicle.gateway.mapper;
|
||||||
import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection;
|
import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface VehicleConnectionMapper {
|
public interface VehicleConnectionMapper {
|
||||||
void addConnect(VehicleConnection vehicleConnection);
|
void addConnect(VehicleConnection vehicleConnection);
|
||||||
|
|
||||||
|
List<String> selectByVehicleVin(String vehicleVin);
|
||||||
|
|
||||||
|
List<VehicleConnection> getMqttServerModel(String vehicleVin);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package com.muyu.cloud.vehicle.gateway.service.impl;
|
||||||
import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection;
|
import com.muyu.cloud.vehicle.gateway.domain.VehicleConnection;
|
||||||
import com.muyu.cloud.vehicle.gateway.domain.VinIp;
|
import com.muyu.cloud.vehicle.gateway.domain.VinIp;
|
||||||
import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel;
|
import com.muyu.cloud.vehicle.gateway.domain.model.MqttServerModel;
|
||||||
|
import com.muyu.cloud.vehicle.gateway.domain.properties.MqttProperties;
|
||||||
import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq;
|
import com.muyu.cloud.vehicle.gateway.domain.req.VehicleConnectionReq;
|
||||||
import com.muyu.cloud.vehicle.gateway.mapper.VehicleConnectionMapper;
|
import com.muyu.cloud.vehicle.gateway.mapper.VehicleConnectionMapper;
|
||||||
import com.muyu.cloud.vehicle.gateway.service.VehicleConnectionService;
|
import com.muyu.cloud.vehicle.gateway.service.VehicleConnectionService;
|
||||||
|
@ -15,6 +16,10 @@ import org.springframework.data.redis.core.HashOperations;
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static com.muyu.cloud.vehicle.gateway.config.RabbitmqConfig.*;
|
||||||
|
|
||||||
@Log4j2
|
@Log4j2
|
||||||
@Service
|
@Service
|
||||||
public class VehicleConnectionServiceImpl implements VehicleConnectionService {
|
public class VehicleConnectionServiceImpl implements VehicleConnectionService {
|
||||||
|
@ -40,8 +45,8 @@ public class VehicleConnectionServiceImpl implements VehicleConnectionService {
|
||||||
log.info("车辆连接请求:{}",vehicleConnectionReq.toString());
|
log.info("车辆连接请求:{}",vehicleConnectionReq.toString());
|
||||||
|
|
||||||
// 使用交换机发送消息
|
// 使用交换机发送消息
|
||||||
rabbitTemplate.convertAndSend("exchange_topics_inform","inform.#.email.#",vehicleConnectionReq.getVehicleVin());
|
// rabbitTemplate.convertAndSend("exchange_topics_inform","inform.#.email.#",vehicleConnectionReq.getVehicleVin());
|
||||||
log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin());
|
// log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin());
|
||||||
|
|
||||||
|
|
||||||
VehicleConnection vehicleConnection = new VehicleConnection();
|
VehicleConnection vehicleConnection = new VehicleConnection();
|
||||||
|
@ -51,45 +56,88 @@ public class VehicleConnectionServiceImpl implements VehicleConnectionService {
|
||||||
vehicleConnection.setUsername(vehicleConnectionReq.getUsername());
|
vehicleConnection.setUsername(vehicleConnectionReq.getUsername());
|
||||||
//密码(vin+时间戳+随机数)
|
//密码(vin+时间戳+随机数)
|
||||||
vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin()+vehicleConnectionReq.getTimestamp()+vehicleConnectionReq.getNonce());
|
vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin()+vehicleConnectionReq.getTimestamp()+vehicleConnectionReq.getNonce());
|
||||||
|
//查询有没有这辆车的vin码
|
||||||
|
List<String> selectVehicle = vehicleConnectionMapper.selectByVehicleVin(vehicleConnectionReq.getVehicleVin());
|
||||||
|
|
||||||
|
if(selectVehicle.isEmpty()){
|
||||||
//添加连接信息
|
//添加连接信息
|
||||||
vehicleConnectionMapper.addConnect(vehicleConnection);
|
vehicleConnectionMapper.addConnect(vehicleConnection);
|
||||||
//先判断vin码
|
log.info("车辆上线成功");
|
||||||
HashOperations<String, String, String> hashOps = redisTemplate.opsForHash();
|
}else {
|
||||||
String vinIp = hashOps.get("vinIp", vehicleConnectionReq.getVehicleVin());
|
throw new RuntimeException("车辆无法重复预上线");
|
||||||
if(vinIp!=null){
|
|
||||||
log.info("车辆绑定ip失败,已经存在");
|
|
||||||
throw new RuntimeException("车辆绑定ip失败,已经存在");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//先判断vin码
|
||||||
|
if(redisService.hasKey(vehicleConnection.getVehicleVin())){
|
||||||
|
log.error("============车辆:{}已经绑定过了",vehicleConnectionReq.getVehicleVin());
|
||||||
|
throw new RuntimeException("=============车辆已经绑定过了");
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttProperties mqttProperties = new MqttProperties();
|
||||||
|
List<VehicleConnection> vehicleVin = selectByVehicleVin(vehicleConnectionReq.getVehicleVin());
|
||||||
|
for (VehicleConnection connection : vehicleVin) {
|
||||||
|
mqttProperties.setClientId(connection.getVehicleVin());
|
||||||
|
mqttProperties.setUserName(connection.getUsername());
|
||||||
|
mqttProperties.setPassword(connection.getPassword());
|
||||||
|
}
|
||||||
|
mqttProperties.setTopic("vehicle");
|
||||||
|
mqttProperties.setQos(0);
|
||||||
|
|
||||||
|
// //使用交换机发送消息
|
||||||
|
// rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM,QUEUE_INFORM_EMAIL,mqttProperties);
|
||||||
|
// log.info("==============发送消息成功:{}",mqttProperties);
|
||||||
|
|
||||||
//判断redis有没有count键
|
//判断redis有没有count键
|
||||||
if(redisTemplate.hasKey("count")){
|
if(redisTemplate.hasKey("oneCount")){
|
||||||
//取出count
|
//取出count
|
||||||
Integer count = Integer.valueOf(redisTemplate.opsForValue().get("count"));
|
Integer count = Integer.valueOf(redisTemplate.opsForValue().get("oneCount"));
|
||||||
if(count == 1){
|
if(count == 1){
|
||||||
redisTemplate.opsForValue().set("count",String.valueOf(0));
|
redisTemplate.opsForValue().set("oneCount",String.valueOf(0));
|
||||||
}else {
|
}else {
|
||||||
redisTemplate.opsForValue().set("count",String.valueOf(count+1));
|
redisTemplate.opsForValue().set("oneCount",String.valueOf(count+1));
|
||||||
}
|
}
|
||||||
//根据游标count获取服务IP
|
//根据游标count获取服务IP
|
||||||
Object ipList = redisService.redisTemplate.opsForList().index("ipList", count);
|
Object ipList = redisService.redisTemplate.opsForList().index("oneIpList", count);
|
||||||
|
log.info("=========================oneIpList:"+ipList);
|
||||||
//关联车辆和服务
|
//关联车辆和服务
|
||||||
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(),ipList.toString()));
|
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(),ipList.toString()));
|
||||||
//响应信息
|
//响应信息
|
||||||
log.info("车辆:{}",vehicleConnectionReq.getVehicleVin()+"绑定成功:{}",ipList);
|
log.info("车辆:{}",vehicleConnectionReq.getVehicleVin()+"绑定成功:{}",ipList);
|
||||||
|
mqttProperties.setBroker("tcp://"+ipList+":1883");
|
||||||
|
|
||||||
|
//使用交换机发送消息
|
||||||
|
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS,mqttProperties);
|
||||||
|
log.info("==================发送消息成功:{}",mqttProperties);
|
||||||
return Result.success(new MqttServerModel("tcp://"+ipList+":1883","vehicle"));
|
return Result.success(new MqttServerModel("tcp://"+ipList+":1883","vehicle"));
|
||||||
|
|
||||||
}else {
|
}else {
|
||||||
redisTemplate.opsForValue().set("count",String.valueOf(0));
|
redisTemplate.opsForValue().set("oneCount",String.valueOf(0));
|
||||||
//根据游标count获取服务器Ip
|
//根据游标count获取服务器Ip
|
||||||
// String ip = redisTemplate.opsForList().index("ipList", 0);
|
// String ip = redisTemplate.opsForList().index("ipList", 0);
|
||||||
Object ipList = redisService.redisTemplate.opsForList().index("ipList", 0);
|
Object ipList = redisService.redisTemplate.opsForList().index("oneIpList", 0);
|
||||||
//关联车辆和服务
|
//关联车辆和服务
|
||||||
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(),ipList.toString()));
|
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(),ipList.toString()));
|
||||||
//响应信息
|
//响应信息
|
||||||
log.info("车辆:{}",vehicleConnectionReq.getVehicleVin(),"与:{}绑定成功",ipList);
|
log.info("车辆:{}",vehicleConnectionReq.getVehicleVin(),"与:{}绑定成功",ipList);
|
||||||
|
mqttProperties.setBroker("tcp://"+ipList+":1883");
|
||||||
|
|
||||||
|
//使用交换机发送消息
|
||||||
|
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS,mqttProperties);
|
||||||
|
log.info("================发送消息成功:{}",mqttProperties);
|
||||||
return Result.success(new MqttServerModel("tcp://"+ipList+":1883","vehicle"));
|
return Result.success(new MqttServerModel("tcp://"+ipList+":1883","vehicle"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询车辆绑定的服务器信息
|
||||||
|
* @param vehicleVin 车辆vin码集合
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private List<VehicleConnection> selectByVehicleVin(String vehicleVin) {
|
||||||
|
return vehicleConnectionMapper.getMqttServerModel(vehicleVin);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 添加车辆绑定IP地址存入redis中
|
* 添加车辆绑定IP地址存入redis中
|
||||||
*/
|
*/
|
||||||
|
@ -97,6 +145,6 @@ public class VehicleConnectionServiceImpl implements VehicleConnectionService {
|
||||||
if (vinIp == null || vinIp.getVin() == null || vinIp.getVin().isEmpty() || vinIp.getIp() == null || vinIp.getIp().isEmpty()) {
|
if (vinIp == null || vinIp.getVin() == null || vinIp.getVin().isEmpty() || vinIp.getIp() == null || vinIp.getIp().isEmpty()) {
|
||||||
throw new IllegalArgumentException("vin 或 ip 不能为空或无效");
|
throw new IllegalArgumentException("vin 或 ip 不能为空或无效");
|
||||||
}
|
}
|
||||||
redisTemplate.opsForHash().put("vinIp", vinIp.getVin(), vinIp.getIp());
|
redisService.setCacheObject(vinIp.getVin(), vinIp.getIp());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,4 +67,4 @@ aliyun:
|
||||||
instance-type: ecs.t6-c1m1.large
|
instance-type: ecs.t6-c1m1.large
|
||||||
security-group-id: sg-uf6hyictocodexptlgiv
|
security-group-id: sg-uf6hyictocodexptlgiv
|
||||||
switch-id: vsw-uf6ags5luz17qd6ckn2tb
|
switch-id: vsw-uf6ags5luz17qd6ckn2tb
|
||||||
amount: 1
|
amount: 2
|
||||||
|
|
|
@ -7,8 +7,19 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||||
|
|
||||||
<insert id="addConnect">
|
<insert id="addConnect">
|
||||||
insert into car_one_click_operation
|
insert into car_one_click_operation
|
||||||
(vehicle_vin,user_name,password)
|
(vin, user_name, password)
|
||||||
values
|
values (#{vehicleVin}, #{username}, #{password})
|
||||||
(#{vehicleVin},#{username},#{password})
|
|
||||||
</insert>
|
</insert>
|
||||||
|
<select id="selectByVehicleVin" resultType="java.lang.String">
|
||||||
|
select vin
|
||||||
|
from car_one_click_operation
|
||||||
|
where vin = #{vehicleVin}
|
||||||
|
</select>
|
||||||
|
<select id="getMqttServerModel" resultType="com.muyu.cloud.vehicle.gateway.domain.VehicleConnection">
|
||||||
|
select vin vehicleVin,
|
||||||
|
user_name,
|
||||||
|
password
|
||||||
|
from car_one_click_operation
|
||||||
|
where vin = #{vehicleVin}
|
||||||
|
</select>
|
||||||
</mapper>
|
</mapper>
|
||||||
|
|
Loading…
Reference in New Issue