feat():添加发送mq到协议解析功能

dev.vehicleGateway
liuyibo 2024-10-11 14:35:39 +08:00
parent 80b68c2df7
commit 1f9b832c38
4 changed files with 122 additions and 20 deletions

View File

@ -29,6 +29,12 @@ public class RabbitmqConfig {
*/
public static final String GO_OFFLINE_QUEUE = "GO_OFFLINE";
/**
*
*/
public static final String SEND_OFFLINE_QUEUE = "GO_LINE";
/**
* 线
*/
@ -39,6 +45,11 @@ public class RabbitmqConfig {
*/
public static final String OFFLINE_EXCHANGE = "OFFLINE_EXCHANGE";
/**
*
*/
public static final String LINE_EXCHANGE = "LINE_EXCHANGE";
/**
* 线key
*/
@ -47,6 +58,10 @@ public class RabbitmqConfig {
* 线key
*/
public static final String GO_ONLINE_ROUTING_KEY = "GO_ONLINE";
/**
* key
*/
public static final String SEND_ONLINE_ROUTING_KEY = "GO_LINE";
/**
* ,
@ -78,6 +93,21 @@ public class RabbitmqConfig {
}
}
/**
* ,
*/
@Bean(LINE_EXCHANGE)
public Exchange exchangeTopicsInform() {
try {
Exchange exchange = ExchangeBuilder.topicExchange(LINE_EXCHANGE).durable(true).build();
log.info("创建的交换机为: {}", LINE_EXCHANGE);
return exchange;
} catch (Exception e) {
log.error("创建该: {} 交换机失败", LINE_EXCHANGE, e);
throw e;
}
}
// 声明QUEUE_INFORM_EMAIL队列
@Bean(GO_OFFLINE_QUEUE)
public Queue queueInformEmail() {
@ -104,6 +134,19 @@ public class RabbitmqConfig {
}
}
// 声明QUEUE_INFORM_SMS队列
@Bean(SEND_OFFLINE_QUEUE)
public Queue queueInformSend() {
try {
Queue queue = new Queue(SEND_OFFLINE_QUEUE);
log.info("创建的队列为: {}", SEND_OFFLINE_QUEUE);
return queue;
} catch (Exception e) {
log.error("创建该: {} 队列失败", SEND_OFFLINE_QUEUE, e);
throw e;
}
}
//ROUTINGKEY_EMAIL队列绑定交换机指定routingKey
@Bean
public Binding bindingQueueInformEmail(@Qualifier(GO_OFFLINE_QUEUE) Queue queue,
@ -116,4 +159,11 @@ public class RabbitmqConfig {
@Qualifier(ONLINE_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(GO_ONLINE_ROUTING_KEY).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机指定routingKey
@Bean
public Binding bindingRoutingKeySend(@Qualifier(SEND_OFFLINE_QUEUE) Queue queue,
@Qualifier(LINE_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(SEND_ONLINE_ROUTING_KEY).noargs();
}
}

View File

@ -0,0 +1,49 @@
package com.muyu.vehiclegateway.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @ClassName MqttProperties
* @Description
* @Author YiBo.Liu
* @Date 2024/10/10 20:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqttProperties {
/**
* MQTT
*/
private String broker;
/**
* MQTT
*/
private String topic;
/**
*
*/
private String username;
/**
*
*/
private String password;
/**
* id
*/
private String clientId;
/**
*
*/
private int qos = 0;
}

View File

@ -124,7 +124,6 @@ public class GenerateInstance implements ApplicationRunner {
redisService.setCacheList("ipList",instanceIps);
} catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message

View File

@ -3,6 +3,7 @@ package com.muyu.vehiclegateway.service.impl;
import com.muyu.common.core.domain.Result;
import com.muyu.common.redis.service.RedisService;
import com.muyu.vehiclegateway.config.ClientConfig;
import com.muyu.vehiclegateway.domain.MqttProperties;
import com.muyu.vehiclegateway.domain.MqttServerModel;
import com.muyu.vehiclegateway.domain.VinIp;
import com.muyu.vehiclegateway.domain.req.VehicleConnectionReq;
@ -10,13 +11,16 @@ import com.muyu.vehiclegateway.instance.GenerateInstance;
import com.muyu.vehiclegateway.mapper.ConnectMapper;
import com.muyu.vehiclegateway.service.ConnectService;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import static com.muyu.vehiclegateway.config.RabbitmqConfig.LINE_EXCHANGE;
import static com.muyu.vehiclegateway.config.RabbitmqConfig.SEND_ONLINE_ROUTING_KEY;
/**
* @ClassName ConnectServiceImpl
@ -28,6 +32,8 @@ import java.util.List;
@Log4j2
public class ConnectServiceImpl implements ConnectService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConnectMapper connectMapper;
@ -76,26 +82,12 @@ public class ConnectServiceImpl implements ConnectService {
}
//先判断vin码
String vin = redisTemplate.opsForValue().get(vehicleConnectionReq.getVehicleVin());
if(!redisTemplate.hasKey(vin)){
// String vin = redisTemplate.opsForValue().get(vehicleConnectionReq.getVehicleVin());
if(redisTemplate.hasKey(vehicleConnectionReq.getVehicleVin())){
log.info("车辆绑定ip失败已经存在");
throw new RuntimeException("车辆已经绑定过了");
}
// //先检查是否有可用的服务器
// String ip = findAvailableServer();
// if(ip==null){
// //如果没有就新增一个服务器
// ip = ClientConfig.addServer();
// }
//
// // 关联车辆和服务
// this.insertVinIp(new VinIp(vehicleConnectionReq.getVehicleVin(), ip));
//
// // 更新服务器绑定数量
// incrementServerBindingCount(ip);
//判断redis有没有count键
if(redisTemplate.hasKey("count")){
//取出count
@ -110,7 +102,19 @@ public class ConnectServiceImpl implements ConnectService {
//关联车辆和服务
this.insertVinIp(new VinIp(vehicleConnectionReq.getVehicleVin(),ip.toString()));
//响应信息
log.info("车辆:{}",vehicleConnectionReq.getVehicleVin()+"成功绑定到:{}",ip);
log.info("车辆:"+vehicleConnectionReq.getVehicleVin()+"成功绑定到:"+ip);
//发送mq到协议解析
MqttProperties mqttProperties = new MqttProperties();
mqttProperties.setBroker("tcp://" + ip + ":1883");
mqttProperties.setTopic("vehicle");
mqttProperties.setUsername(vehicleConnectionReq.getUsername());
mqttProperties.setPassword(vehicleConnectionReq.getPassword());
mqttProperties.setClientId(ip.toString());
mqttProperties.setQos(0);
rabbitTemplate.convertAndSend(LINE_EXCHANGE,SEND_ONLINE_ROUTING_KEY,mqttProperties);
return Result.success(new MqttServerModel("tcp://"+ip+":1883","vehicle"));
}else {
redisTemplate.opsForValue().set("count",String.valueOf(0));
@ -132,7 +136,7 @@ public class ConnectServiceImpl implements ConnectService {
if (vinIp == null || vinIp.getVin() == null || vinIp.getVin().isEmpty() || vinIp.getIp() == null || vinIp.getIp().isEmpty()) {
throw new IllegalArgumentException("vin 或 ip 不能为空或无效");
}
// redisTemplate.opsForHash().put("vinIp", vinIp.getVin(), vinIp.getIp());
redisTemplate.opsForValue().set(vinIp.getVin(),vinIp.getIp());
}