feat:修复代码 + 合并网关
parent
872660f84b
commit
dbe9f57691
|
@ -2,6 +2,7 @@ package com.muyu.common.rabbit.constants;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* rabbit常量
|
* rabbit常量
|
||||||
|
*
|
||||||
* @Author: 胡杨
|
* @Author: 胡杨
|
||||||
* @date: 2024/7/10
|
* @date: 2024/7/10
|
||||||
* @Description: rabbit常量
|
* @Description: rabbit常量
|
||||||
|
@ -9,7 +10,8 @@ package com.muyu.common.rabbit.constants;
|
||||||
*/
|
*/
|
||||||
public class RabbitConstants {
|
public class RabbitConstants {
|
||||||
|
|
||||||
public final static String GO_ONLINE_QUEUE= "GoOnline";
|
public final static String GO_ONLINE_QUEUE = "GoOnline";
|
||||||
|
|
||||||
public final static String DOWNLINE_QUEUE= "Downline";
|
public final static String DOWNLINE_QUEUE = "Downline";
|
||||||
|
public final static String FORM_QUEUE = "queue_inform_sms";
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package com.muyu.cargateway.Aliyun;
|
package com.muyu.car.gateway.Aliyun;
|
||||||
|
|
||||||
import com.aliyun.ecs20140526.Client;
|
import com.aliyun.ecs20140526.Client;
|
||||||
import com.aliyun.teaopenapi.models.Config;
|
import com.aliyun.teaopenapi.models.Config;
|
||||||
import com.muyu.cargateway.config.AliProperties;
|
import com.muyu.car.gateway.config.AliProperties;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
|
@ -1,24 +1,25 @@
|
||||||
package com.muyu.cargateway.instance;
|
package com.muyu.car.gateway.Aliyun.instance;
|
||||||
|
|
||||||
import com.muyu.cargateway.Aliyun.service.AliYunEcsService;
|
import com.muyu.car.gateway.Aliyun.service.AliYunEcsService;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.beans.factory.DisposableBean;
|
import org.springframework.beans.factory.DisposableBean;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-10-07-21:51
|
* @ Date:2024-10-07-21:51
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:删除实例方法
|
* @ Description:删除实例方法
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Log4j2
|
@Log4j2
|
||||||
@Component
|
@Component
|
||||||
public class DeleteSample implements DisposableBean {
|
public class DeleteSample implements DisposableBean {
|
||||||
@Autowired
|
@Autowired
|
||||||
private AliYunEcsService aliYunEcsService;
|
private AliYunEcsService aliYunEcsService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
try {
|
try {
|
|
@ -1,9 +1,8 @@
|
||||||
package com.muyu.cargateway.instance;
|
package com.muyu.car.gateway.Aliyun.instance;
|
||||||
|
|
||||||
import com.muyu.cargateway.Aliyun.service.AliYunEcsService;
|
import com.muyu.car.gateway.Aliyun.service.AliYunEcsService;
|
||||||
import com.muyu.cargateway.config.AliProperties;
|
import com.muyu.car.gateway.config.AliProperties;
|
||||||
import com.muyu.cargateway.domain.AliInstance;
|
import com.muyu.car.gateway.domain.AliInstance;
|
||||||
import com.muyu.common.redis.service.RedisService;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
@ -22,14 +21,12 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
@Log4j2
|
@Log4j2
|
||||||
@Component
|
@Component
|
||||||
public class Sample implements ApplicationRunner{
|
public class Sample implements ApplicationRunner {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private AliYunEcsService aliYunEcsService;
|
private AliYunEcsService aliYunEcsService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private AliProperties aliProperties;
|
private AliProperties aliProperties;
|
||||||
@Autowired
|
|
||||||
private RedisService redisService;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(ApplicationArguments args) throws Exception {
|
public void run(ApplicationArguments args) throws Exception {
|
||||||
|
@ -42,17 +39,14 @@ public class Sample implements ApplicationRunner{
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
log.info("创建实例成功");
|
log.info("创建实例成功");
|
||||||
redisService.setCacheList("instanceIds", list);
|
// redisService.setCacheList("instanceIds", list);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(6000);
|
Thread.sleep(9000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
List<AliInstance> aliInstances = aliYunEcsService.selectInstance(list);
|
List<AliInstance> aliInstances = aliYunEcsService.selectInstance(list);
|
||||||
log.info("查询实例信息成功:{}",aliInstances);
|
log.info("================查询实例信息成功:{}", aliInstances);
|
||||||
// 将查询到的实例信息列表存储到Redis中
|
|
||||||
redisService.setCacheList("instanceList", aliInstances);
|
|
||||||
log.info("redis存储成功:{}", aliInstances);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Override
|
// @Override
|
|
@ -1,11 +1,11 @@
|
||||||
package com.muyu.cargateway.Aliyun.service;
|
package com.muyu.car.gateway.Aliyun.service;
|
||||||
|
|
||||||
import com.aliyun.ecs20140526.Client;
|
import com.aliyun.ecs20140526.Client;
|
||||||
import com.aliyun.ecs20140526.models.*;
|
import com.aliyun.ecs20140526.models.*;
|
||||||
import com.aliyun.tea.TeaException;
|
import com.aliyun.tea.TeaException;
|
||||||
import com.aliyun.teautil.models.RuntimeOptions;
|
import com.aliyun.teautil.models.RuntimeOptions;
|
||||||
import com.muyu.cargateway.config.AliProperties;
|
import com.muyu.car.gateway.domain.AliInstance;
|
||||||
import com.muyu.cargateway.domain.AliInstance;
|
import com.muyu.car.gateway.config.AliProperties;
|
||||||
import com.muyu.common.core.exception.ServiceException;
|
import com.muyu.common.core.exception.ServiceException;
|
||||||
import com.muyu.common.redis.service.RedisService;
|
import com.muyu.common.redis.service.RedisService;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
@ -49,8 +49,9 @@ public class AliYunEcsService {
|
||||||
* @return 实例id集合
|
* @return 实例id集合
|
||||||
*/
|
*/
|
||||||
public List<String> generateInstance(Integer amount) {
|
public List<String> generateInstance(Integer amount) {
|
||||||
redisService.deleteObject("instanceIds");
|
redisService.deleteObject("oneIpList");
|
||||||
redisService.deleteObject("instanceList");
|
redisService.deleteObject("oneCount");
|
||||||
|
redisService.deleteObject("oneVinIp");
|
||||||
// 检查生成实例的数量是否有效
|
// 检查生成实例的数量是否有效
|
||||||
if (amount == null || amount <= 0) {
|
if (amount == null || amount <= 0) {
|
||||||
throw new ServiceException("生成数量不能小于1");
|
throw new ServiceException("生成数量不能小于1");
|
||||||
|
@ -125,6 +126,7 @@ public class AliYunEcsService {
|
||||||
// 创建运行时选项对象,用于配置请求的额外参数
|
// 创建运行时选项对象,用于配置请求的额外参数
|
||||||
RuntimeOptions runtimeOptions = new RuntimeOptions();
|
RuntimeOptions runtimeOptions = new RuntimeOptions();
|
||||||
List<AliInstance> aliInstances = new ArrayList<>();
|
List<AliInstance> aliInstances = new ArrayList<>();
|
||||||
|
List<String> stringArrayList = new ArrayList<>();
|
||||||
try {
|
try {
|
||||||
// 发送请求并获取响应对象
|
// 发送请求并获取响应对象
|
||||||
DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(request, runtimeOptions);
|
DescribeInstancesResponse describeInstancesResponse = client.describeInstancesWithOptions(request, runtimeOptions);
|
||||||
|
@ -136,16 +138,25 @@ public class AliYunEcsService {
|
||||||
for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance bodyInstance : instance) {
|
for (DescribeInstancesResponseBody.DescribeInstancesResponseBodyInstancesInstance bodyInstance : instance) {
|
||||||
// 实例id
|
// 实例id
|
||||||
String instanceId = bodyInstance.getInstanceId();
|
String instanceId = bodyInstance.getInstanceId();
|
||||||
log.info("实例id为:{}", instanceId);
|
|
||||||
// ip地址
|
// ip地址
|
||||||
String ipAddress = bodyInstance.getPublicIpAddress().getIpAddress().get(0);
|
String ipAddress = bodyInstance.getPublicIpAddress().getIpAddress().get(0);
|
||||||
log.info("实例ip为:{}", ipAddress);
|
|
||||||
// 实例状态
|
// 实例状态
|
||||||
String status = bodyInstance.getStatus();
|
String status = bodyInstance.getStatus();
|
||||||
log.info("实例状态为:{}", status);
|
|
||||||
|
log.info("=======================实例id为:{}", instanceId);
|
||||||
|
log.info("=======================实例ip为:{}", ipAddress);
|
||||||
|
log.info("=======================实例状态为:{}", status);
|
||||||
|
|
||||||
|
|
||||||
|
stringArrayList.add(ipAddress);
|
||||||
AliInstance aliInstance = new AliInstance(instanceId, ipAddress, status);
|
AliInstance aliInstance = new AliInstance(instanceId, ipAddress, status);
|
||||||
aliInstances.add(aliInstance);
|
aliInstances.add(aliInstance);
|
||||||
|
redisService.setCacheList(instanceId, aliInstances);
|
||||||
|
aliInstances.remove(aliInstance);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
log.info("======================ipList:{}", stringArrayList);
|
||||||
|
redisService.setCacheList("oneIpList", stringArrayList);
|
||||||
log.info("查询成功");
|
log.info("查询成功");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("查询服务器实例错误:[{}]", e.getMessage(), e);
|
log.error("查询服务器实例错误:[{}]", e.getMessage(), e);
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway;
|
package com.muyu.car.gateway;
|
||||||
|
|
||||||
import com.muyu.common.security.annotation.EnableCustomConfig;
|
import com.muyu.common.security.annotation.EnableCustomConfig;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
@ -7,19 +7,19 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-17-15:00
|
* @ Date:2024-09-17-15:00
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:故障启动类
|
* @ Description:故障启动类
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Log4j2
|
@Log4j2
|
||||||
@EnableCustomConfig
|
@EnableCustomConfig
|
||||||
@EnableFeignClients
|
@EnableFeignClients
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
public class CloudVehicleGatewayApplication {
|
public class CarGatewayApplication {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(CloudVehicleGatewayApplication.class, args);
|
SpringApplication.run(CarGatewayApplication.class, args);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway.config;
|
package com.muyu.car.gateway.config;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
@ -0,0 +1,189 @@
|
||||||
|
package com.muyu.car.gateway.config;
|
||||||
|
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.amqp.core.*;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Lenovo
|
||||||
|
* @ Tool:IntelliJ IDEA
|
||||||
|
* @ Author:CHX
|
||||||
|
* @ Date:2024-10-04-15:13
|
||||||
|
* @ Version:1.0
|
||||||
|
* @ Description:rabbitmq配置类
|
||||||
|
*/
|
||||||
|
@Log4j2
|
||||||
|
@Configuration
|
||||||
|
public class RabbitmqConfig {
|
||||||
|
/**
|
||||||
|
* 日志
|
||||||
|
*/
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 交换机
|
||||||
|
*/
|
||||||
|
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 队列 车辆上线给事件系统发送vin
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
|
||||||
|
/**
|
||||||
|
* 队列 协议解析
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
|
||||||
|
/**
|
||||||
|
* 队列 车辆下线给事件系统发送vin
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_SEND = "queue_inform_send";
|
||||||
|
/**
|
||||||
|
* 队列 saas系统
|
||||||
|
*/
|
||||||
|
public static final String QUEUE_INFORM_SAAS = "queue_inform_saas";
|
||||||
|
/**
|
||||||
|
* 路由key 车辆上线给事件系统
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
|
||||||
|
/**
|
||||||
|
* 路由key 协议解析
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
|
||||||
|
/**
|
||||||
|
* 路由key 车辆下线给事件系统
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_SEND = "inform.#.send.#";
|
||||||
|
/**
|
||||||
|
* 路由key saas系统
|
||||||
|
*/
|
||||||
|
public static final String ROUTINGKEY_SAAS = "inform.#.saas.#";
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明交换机,做持久化
|
||||||
|
*/
|
||||||
|
@Bean(EXCHANGE_TOPICS_INFORM)
|
||||||
|
public Exchange exchangeTopicsInform() {
|
||||||
|
try {
|
||||||
|
Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
|
||||||
|
log.info("创建的交换机为: {}", EXCHANGE_TOPICS_INFORM);
|
||||||
|
return exchange;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 交换机失败", EXCHANGE_TOPICS_INFORM, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明QUEUE_INFORM_EMAIL 队列
|
||||||
|
*/
|
||||||
|
@Bean(QUEUE_INFORM_EMAIL)
|
||||||
|
public Queue queueInformEmail() {
|
||||||
|
try {
|
||||||
|
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
|
||||||
|
log.info("创建的队列为: {}", QUEUE_INFORM_EMAIL);
|
||||||
|
return queue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_EMAIL, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明QUEUE_INFORM_SMS 队列
|
||||||
|
*/
|
||||||
|
@Bean(QUEUE_INFORM_SMS)
|
||||||
|
public Queue queueInformSms() {
|
||||||
|
try {
|
||||||
|
Queue queue = new Queue(QUEUE_INFORM_SMS);
|
||||||
|
log.info("创建的队列为: {}", QUEUE_INFORM_SMS);
|
||||||
|
return queue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_SMS, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SEND 队列
|
||||||
|
*/
|
||||||
|
@Bean(QUEUE_INFORM_SEND)
|
||||||
|
public Queue queueInformSend() {
|
||||||
|
try {
|
||||||
|
Queue queue = new Queue(QUEUE_INFORM_SEND);
|
||||||
|
log.info("创建的队列为: {}", QUEUE_INFORM_SEND);
|
||||||
|
return queue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_SEND, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SAAS 队列
|
||||||
|
*/
|
||||||
|
@Bean(QUEUE_INFORM_SAAS)
|
||||||
|
public Queue queueInformSaas() {
|
||||||
|
try {
|
||||||
|
Queue queue = new Queue(QUEUE_INFORM_SAAS);
|
||||||
|
log.info("创建的队列为: {}", QUEUE_INFORM_SAAS);
|
||||||
|
return queue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("创建该: {} 队列失败", QUEUE_INFORM_SAAS, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_EMAIL队列绑定交换机,指定routingKey ROUTINGKEY_EMAIL
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_EMAIL
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding bindingQueueInformEmail(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
|
||||||
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SMS 队列绑定交换机,指定routingKey ROUTINGKEY_SMS
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_SMS
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding bindingRoutingKeySms(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
|
||||||
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SEND队列绑定交换机,指定routingKey ROUTINGKEY_SEND
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_SEND
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding bindingRoutingKeySend(@Qualifier(QUEUE_INFORM_SEND) Queue queue,
|
||||||
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SEND).noargs();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QUEUE_INFORM_SAAS队列绑定交换机,指定routingKey ROUTINGKEY_SAAS
|
||||||
|
*
|
||||||
|
* @param queue QUEUE_INFORM_SAAS
|
||||||
|
* @param exchange EXCHANGE_TOPICS_INFORM
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding bindingRoutingKeySaas(@Qualifier(QUEUE_INFORM_SAAS) Queue queue,
|
||||||
|
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
|
||||||
|
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SAAS).noargs();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,8 +1,8 @@
|
||||||
package com.muyu.cargateway.controller;
|
package com.muyu.car.gateway.controller;
|
||||||
|
|
||||||
import com.muyu.cargateway.domain.model.MqttServerModel;
|
import com.muyu.car.gateway.domain.req.VehicleConnectionReq;
|
||||||
import com.muyu.cargateway.domain.req.VehicleConnectionReq;
|
import com.muyu.car.gateway.service.CarOneClickOperationService;
|
||||||
import com.muyu.cargateway.service.CarOneClickOperationService;
|
import com.muyu.car.gateway.domain.model.MqttServerModel;
|
||||||
import com.muyu.common.core.domain.Result;
|
import com.muyu.common.core.domain.Result;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
@ -13,12 +13,12 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-26-20:14
|
* @ Date:2024-09-26-20:14
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:车辆
|
* @ Description:车辆
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Log4j2
|
@Log4j2
|
||||||
@RestController
|
@RestController
|
||||||
|
@ -30,13 +30,14 @@ public class CarOneClickOperationController {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取http连接的参数
|
* 获取http连接的参数
|
||||||
|
*
|
||||||
* @param vehicleConnectionReq
|
* @param vehicleConnectionReq
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@PostMapping("/receiveMsg/connect")
|
@PostMapping("/receiveMsg/connect")
|
||||||
public Result<MqttServerModel> receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq){
|
public Result<MqttServerModel> receiveMsg(@RequestBody VehicleConnectionReq vehicleConnectionReq) {
|
||||||
log.info(">"+vehicleConnectionReq);
|
log.info("======================" + vehicleConnectionReq);
|
||||||
MqttServerModel mqttServerModel =carOneClickOperationService.getConnect(vehicleConnectionReq);
|
return carOneClickOperationService.getConnect(vehicleConnectionReq);
|
||||||
return Result.success(mqttServerModel);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway.domain;
|
package com.muyu.car.gateway.domain;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
|
@ -6,12 +6,12 @@ import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-30-9:33
|
* @ Date:2024-09-30-9:33
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:
|
* @ Description:
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Builder
|
@Builder
|
||||||
|
@ -19,7 +19,7 @@ import lombok.NoArgsConstructor;
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
public class AliInstance {
|
public class AliInstance {
|
||||||
/**
|
/**
|
||||||
*实例ID
|
* 实例ID
|
||||||
*/
|
*/
|
||||||
private String instanceId;
|
private String instanceId;
|
||||||
/**
|
/**
|
|
@ -1,16 +1,16 @@
|
||||||
package com.muyu.cargateway.domain;
|
package com.muyu.car.gateway.domain;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-28-16:37
|
* @ Date:2024-09-28-16:37
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:创建实例的配置
|
* @ Description:创建实例的配置
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
|
@ -1,16 +1,16 @@
|
||||||
package com.muyu.cargateway.domain;
|
package com.muyu.car.gateway.domain;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-10-04-15:16
|
* @ Date:2024-10-04-15:16
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:车辆服务器
|
* @ Description:车辆服务器
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway.domain;
|
package com.muyu.car.gateway.domain;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.annotation.IdType;
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
import com.baomidou.mybatisplus.annotation.TableId;
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
|
@ -9,23 +9,23 @@ import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-27-20:56
|
* @ Date:2024-09-27-20:56
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:服务器配置
|
* @ Description:服务器配置
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Builder
|
@Builder
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@TableName(value="server_config")
|
@TableName(value = "server_config")
|
||||||
public class ServerConfig {
|
public class ServerConfig {
|
||||||
/**
|
/**
|
||||||
* 主键
|
* 主键
|
||||||
*/
|
*/
|
||||||
@TableId(value = "id",type = IdType.AUTO)
|
@TableId(value = "id", type = IdType.AUTO)
|
||||||
private Long id;
|
private Long id;
|
||||||
/**
|
/**
|
||||||
* 租户id
|
* 租户id
|
|
@ -1,16 +1,16 @@
|
||||||
package com.muyu.cargateway.domain;
|
package com.muyu.car.gateway.domain;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-10-06-11:05
|
* @ Date:2024-10-06-11:05
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:车辆鉴权的参数
|
* @ Description:车辆鉴权的参数
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
|
@ -1,16 +1,16 @@
|
||||||
package com.muyu.cargateway.domain;
|
package com.muyu.car.gateway.domain;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-10-03-10:10
|
* @ Date:2024-10-03-10:10
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:车辆vin
|
* @ Description:车辆vin
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
|
@ -19,9 +19,9 @@ public class VinIp {
|
||||||
/**
|
/**
|
||||||
* 车辆的vin
|
* 车辆的vin
|
||||||
*/
|
*/
|
||||||
String vehicleVin;
|
String vin;
|
||||||
/**
|
/**
|
||||||
* 车辆的ip
|
* 车辆的ip
|
||||||
*/
|
*/
|
||||||
String ipAddress;
|
String ip;
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway.domain.model;
|
package com.muyu.car.gateway.domain.model;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
|
@ -6,12 +6,12 @@ import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-10-03-10:12
|
* @ Date:2024-10-03-10:12
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:Mqtt服务模型
|
* @ Description:Mqtt服务模型
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Builder
|
@Builder
|
||||||
|
@ -26,4 +26,6 @@ public class MqttServerModel {
|
||||||
* MQTT订阅主题
|
* MQTT订阅主题
|
||||||
*/
|
*/
|
||||||
private String topic;
|
private String topic;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway.domain.properties;
|
package com.muyu.car.gateway.domain.properties;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway.domain.req;
|
package com.muyu.car.gateway.domain.req;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
|
@ -6,12 +6,12 @@ import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-10-03-10:04
|
* @ Date:2024-10-03-10:04
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:车辆获取连接地址
|
* @ Description:车辆获取连接地址
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Builder
|
@Builder
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.cargateway.domain.resp;
|
package com.muyu.car.gateway.domain.resp;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
@ -6,12 +6,12 @@ import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-26-15:53
|
* @ Date:2024-09-26-15:53
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:调用Ali服务器配置实体类
|
* @ Description:调用Ali服务器配置实体类
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@SuperBuilder
|
@SuperBuilder
|
|
@ -1,21 +1,26 @@
|
||||||
package com.muyu.cargateway.mapper;
|
package com.muyu.car.gateway.mapper;
|
||||||
|
|
||||||
import com.muyu.cargateway.domain.VehicleConnection;
|
import com.muyu.car.gateway.domain.VehicleConnection;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-26-20:15
|
* @ Date:2024-09-26-20:15
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:车辆一键操作持久层
|
* @ Description:车辆一键操作持久层
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface CarOneClickOperationMapper {
|
public interface CarOneClickOperationMapper {
|
||||||
void addConnect(VehicleConnection vehicleConnection);
|
void addConnect(VehicleConnection vehicleConnection);
|
||||||
|
|
||||||
List<String> selectByVehicleVin(String vehicleVin);
|
List<String> selectByVehicleVin(String vehicleVin);
|
||||||
|
|
||||||
|
|
||||||
|
List<VehicleConnection> getMqttServerModel(String vehicleVin);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,22 +1,24 @@
|
||||||
package com.muyu.cargateway.service;
|
package com.muyu.car.gateway.service;
|
||||||
|
|
||||||
import com.muyu.cargateway.domain.model.MqttServerModel;
|
import com.muyu.car.gateway.domain.model.MqttServerModel;
|
||||||
import com.muyu.cargateway.domain.req.VehicleConnectionReq;
|
import com.muyu.car.gateway.domain.req.VehicleConnectionReq;
|
||||||
|
import com.muyu.common.core.domain.Result;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @author Lenovo
|
||||||
* @ Tool:IntelliJ IDEA
|
* @ Tool:IntelliJ IDEA
|
||||||
* @ Author:CHX
|
* @ Author:CHX
|
||||||
* @ Date:2024-09-26-20:15
|
* @ Date:2024-09-26-20:15
|
||||||
* @ Version:1.0
|
* @ Version:1.0
|
||||||
* @ Description:车辆一键操作业务层
|
* @ Description:车辆一键操作业务层
|
||||||
* @author Lenovo
|
|
||||||
*/
|
*/
|
||||||
public interface CarOneClickOperationService {
|
public interface CarOneClickOperationService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取连接
|
* 获取连接
|
||||||
|
*
|
||||||
* @param vehicleConnectionReq 车辆连接请求参数
|
* @param vehicleConnectionReq 车辆连接请求参数
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
MqttServerModel getConnect(VehicleConnectionReq vehicleConnectionReq);
|
Result<MqttServerModel> getConnect(VehicleConnectionReq vehicleConnectionReq);
|
||||||
}
|
}
|
|
@ -0,0 +1,144 @@
|
||||||
|
package com.muyu.car.gateway.service.Impl;
|
||||||
|
|
||||||
|
import com.muyu.car.gateway.domain.VehicleConnection;
|
||||||
|
import com.muyu.car.gateway.domain.VinIp;
|
||||||
|
import com.muyu.car.gateway.domain.model.MqttServerModel;
|
||||||
|
import com.muyu.car.gateway.domain.properties.MqttProperties;
|
||||||
|
import com.muyu.car.gateway.domain.req.VehicleConnectionReq;
|
||||||
|
import com.muyu.car.gateway.mapper.CarOneClickOperationMapper;
|
||||||
|
import com.muyu.car.gateway.service.CarOneClickOperationService;
|
||||||
|
import com.muyu.common.core.domain.Result;
|
||||||
|
import com.muyu.common.redis.service.RedisService;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static com.muyu.car.gateway.config.RabbitmqConfig.EXCHANGE_TOPICS_INFORM;
|
||||||
|
import static com.muyu.car.gateway.config.RabbitmqConfig.ROUTINGKEY_SMS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Lenovo
|
||||||
|
* @ Tool:IntelliJ IDEA
|
||||||
|
* @ Author:CHX
|
||||||
|
* @ Date:2024-09-26-20:16
|
||||||
|
* @ Version:1.0
|
||||||
|
* @ Description:车辆一键操作业务实现层
|
||||||
|
*/
|
||||||
|
@Log4j2
|
||||||
|
@Service
|
||||||
|
public class CarOneClickOperationServiceImpl implements CarOneClickOperationService {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private CarOneClickOperationMapper carOneClickOperationMapper;
|
||||||
|
@Autowired
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
@Autowired
|
||||||
|
private RedisService redisService;
|
||||||
|
@Autowired
|
||||||
|
private StringRedisTemplate redisTemplate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取连接信息
|
||||||
|
*
|
||||||
|
* @param vehicleConnectionReq 车辆连接请求参数
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Result<MqttServerModel> getConnect(VehicleConnectionReq vehicleConnectionReq) {
|
||||||
|
log.info("车辆连接请求:{}", vehicleConnectionReq.toString());
|
||||||
|
|
||||||
|
VehicleConnection vehicleConnection = new VehicleConnection();
|
||||||
|
//车辆vin
|
||||||
|
vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin());
|
||||||
|
//用户名
|
||||||
|
vehicleConnection.setUsername(vehicleConnectionReq.getUsername());
|
||||||
|
//密码(vin+时间戳+随机数)
|
||||||
|
vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin() + vehicleConnectionReq.getTimestamp() + vehicleConnectionReq.getNonce());
|
||||||
|
//查询有没有这辆车的vin码
|
||||||
|
List<String> selectVehicle = carOneClickOperationMapper.selectByVehicleVin(vehicleConnectionReq.getVehicleVin());
|
||||||
|
|
||||||
|
if (selectVehicle.isEmpty()) {
|
||||||
|
//添加连接信息
|
||||||
|
carOneClickOperationMapper.addConnect(vehicleConnection);
|
||||||
|
log.info("车辆上线成功");
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("车辆无法重复上线");
|
||||||
|
|
||||||
|
}
|
||||||
|
//先判断vin码
|
||||||
|
if (redisService.hasKey(vehicleConnectionReq.getVehicleVin())) {
|
||||||
|
log.error("=============车辆:{}已经绑定过了", vehicleConnectionReq.getVehicleVin());
|
||||||
|
throw new RuntimeException("=============车辆已经绑定过了");
|
||||||
|
}
|
||||||
|
MqttProperties mqttProperties = new MqttProperties();
|
||||||
|
List<VehicleConnection> vehicleVin = selectByVehicleVin(vehicleConnectionReq.getVehicleVin());
|
||||||
|
for (VehicleConnection connection : vehicleVin) {
|
||||||
|
mqttProperties.setClientId(connection.getVehicleVin());
|
||||||
|
mqttProperties.setUserName(connection.getUsername());
|
||||||
|
mqttProperties.setPassword(connection.getPassword());
|
||||||
|
}
|
||||||
|
mqttProperties.setTopic("vehicle");
|
||||||
|
mqttProperties.setQos(0);
|
||||||
|
//判断redis有没有count键
|
||||||
|
if (redisTemplate.hasKey("oneCount")) {
|
||||||
|
//取出count
|
||||||
|
Integer count = Integer.valueOf(redisTemplate.opsForValue().get("oneCount"));
|
||||||
|
if (count == 1) {
|
||||||
|
redisTemplate.opsForValue().set("oneCount", String.valueOf(0));
|
||||||
|
} else {
|
||||||
|
redisTemplate.opsForValue().set("oneCount", String.valueOf(count + 1));
|
||||||
|
}
|
||||||
|
//根据游标count获取服务IP
|
||||||
|
// String ip = redisTemplate.opsForList().index("ipList", count);
|
||||||
|
Object ipList = redisService.redisTemplate.opsForList().index("oneIpList", count);
|
||||||
|
|
||||||
|
log.info("=========================oneIpList:" + ipList);
|
||||||
|
//关联车辆和服务
|
||||||
|
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString()));
|
||||||
|
//响应信息
|
||||||
|
log.info("车辆:{}", vehicleConnectionReq.getVehicleVin() + "绑定成功:{}", ipList);
|
||||||
|
mqttProperties.setBroker("tcp://" + ipList + ":1883");
|
||||||
|
// 使用交换机发送消息
|
||||||
|
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties);
|
||||||
|
log.info("============================发送消息成功:{}", mqttProperties);
|
||||||
|
return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle"));
|
||||||
|
} else {
|
||||||
|
redisTemplate.opsForValue().set("oneCount", String.valueOf(0));
|
||||||
|
//根据游标count获取服务器Ip
|
||||||
|
Object ipList = redisService.redisTemplate.opsForList().index("oneIpList", 0);
|
||||||
|
//关联车辆和服务
|
||||||
|
this.addIpAddress(new VinIp(vehicleConnectionReq.getVehicleVin(), ipList.toString()));
|
||||||
|
//响应信息
|
||||||
|
log.info("车辆:{}", vehicleConnectionReq.getVehicleVin(), "与:{}绑定成功", ipList);
|
||||||
|
mqttProperties.setBroker("tcp://" + ipList + ":1883");
|
||||||
|
// 使用交换机发送消息
|
||||||
|
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS, mqttProperties);
|
||||||
|
log.info("============================发送消息成功:{}", mqttProperties);
|
||||||
|
return Result.success(new MqttServerModel("tcp://" + ipList + ":1883", "vehicle"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加车辆绑定IP地址存入redis中
|
||||||
|
*/
|
||||||
|
public void addIpAddress(VinIp vinIp) {
|
||||||
|
if (vinIp == null || vinIp.getVin() == null || vinIp.getVin().isEmpty() || vinIp.getIp() == null || vinIp.getIp().isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("vin 或 ip 不能为空或无效");
|
||||||
|
}
|
||||||
|
redisService.setCacheObject(vinIp.getVin(), vinIp.getIp());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询车辆绑定的服务器信息
|
||||||
|
*
|
||||||
|
* @param vehicleVin 车辆vin码集合
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public List<VehicleConnection> selectByVehicleVin(String vehicleVin) {
|
||||||
|
return carOneClickOperationMapper.getMqttServerModel(vehicleVin);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,93 +0,0 @@
|
||||||
package com.muyu.cargateway.domain;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 返回状态码
|
|
||||||
*
|
|
||||||
* @author ruoyi
|
|
||||||
*/
|
|
||||||
public class HttpStatus {
|
|
||||||
/**
|
|
||||||
* 操作成功
|
|
||||||
*/
|
|
||||||
public static final int SUCCESS = 200;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 对象创建成功
|
|
||||||
*/
|
|
||||||
public static final int CREATED = 201;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 请求已经被接受
|
|
||||||
*/
|
|
||||||
public static final int ACCEPTED = 202;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 操作已经执行成功,但是没有返回数据
|
|
||||||
*/
|
|
||||||
public static final int NO_CONTENT = 204;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源已被移除
|
|
||||||
*/
|
|
||||||
public static final int MOVED_PERM = 301;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 重定向
|
|
||||||
*/
|
|
||||||
public static final int SEE_OTHER = 303;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源没有被修改
|
|
||||||
*/
|
|
||||||
public static final int NOT_MODIFIED = 304;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参数列表错误(缺少,格式不匹配)
|
|
||||||
*/
|
|
||||||
public static final int BAD_REQUEST = 400;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 未授权
|
|
||||||
*/
|
|
||||||
public static final int UNAUTHORIZED = 401;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 访问受限,授权过期
|
|
||||||
*/
|
|
||||||
public static final int FORBIDDEN = 403;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源,服务未找到
|
|
||||||
*/
|
|
||||||
public static final int NOT_FOUND = 404;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 不允许的http方法
|
|
||||||
*/
|
|
||||||
public static final int BAD_METHOD = 405;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 资源冲突,或者资源被锁
|
|
||||||
*/
|
|
||||||
public static final int CONFLICT = 409;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 不支持的数据,媒体类型
|
|
||||||
*/
|
|
||||||
public static final int UNSUPPORTED_TYPE = 415;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 系统内部错误
|
|
||||||
*/
|
|
||||||
public static final int ERROR = 500;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 接口未实现
|
|
||||||
*/
|
|
||||||
public static final int NOT_IMPLEMENTED = 501;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 系统警告消息
|
|
||||||
*/
|
|
||||||
public static final int WARN = 601;
|
|
||||||
}
|
|
|
@ -1,87 +0,0 @@
|
||||||
package com.muyu.cargateway.domain.model;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @ Tool:IntelliJ IDEA
|
|
||||||
* @ Author:CHX
|
|
||||||
* @ Date:2024-09-26-20:23
|
|
||||||
* @ Version:1.0
|
|
||||||
* @ Description:任务执行模型
|
|
||||||
* @author Lenovo
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@Log4j2
|
|
||||||
@Builder
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
public class TaskModel {
|
|
||||||
/**
|
|
||||||
* 任务状态 默认为false状态
|
|
||||||
* true为执行中,false为未执行
|
|
||||||
*/
|
|
||||||
private final AtomicBoolean status =new AtomicBoolean(Boolean.FALSE);
|
|
||||||
/**
|
|
||||||
* 堵塞计数器
|
|
||||||
*/
|
|
||||||
private CountDownLatch countDownLatch;
|
|
||||||
/**
|
|
||||||
* 任务执行堵塞队列
|
|
||||||
*/
|
|
||||||
private LinkedBlockingDeque<String> carQueue =new LinkedBlockingDeque<>();
|
|
||||||
/**
|
|
||||||
* 任务是否执行
|
|
||||||
* true 执行中
|
|
||||||
* false 未执行
|
|
||||||
* @return 是否有任务执行
|
|
||||||
*/
|
|
||||||
private boolean isExecution(){
|
|
||||||
return !status.get();
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* 任务名称
|
|
||||||
*/
|
|
||||||
private String taskName;
|
|
||||||
/**
|
|
||||||
* 任务执行次数
|
|
||||||
*/
|
|
||||||
private Integer taskExecutionCount=0;
|
|
||||||
/**
|
|
||||||
* 任务开始时间
|
|
||||||
*/
|
|
||||||
private Long taskStartTime;
|
|
||||||
/**
|
|
||||||
* 任务成功执行次数
|
|
||||||
*/
|
|
||||||
private AtomicInteger taskSuccessSum=new AtomicInteger();
|
|
||||||
/**
|
|
||||||
* 任务执行失败次数
|
|
||||||
*/
|
|
||||||
private AtomicInteger taskErrorSum=new AtomicInteger();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 判断是否有任务
|
|
||||||
* @return true 有任务
|
|
||||||
*/
|
|
||||||
public boolean hashNext(){
|
|
||||||
return !carQueue.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取下一个任务节点
|
|
||||||
* @return 任务VIN
|
|
||||||
*/
|
|
||||||
public String next(){
|
|
||||||
return carQueue.poll();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,79 +0,0 @@
|
||||||
package com.muyu.cargateway.service.Impl;
|
|
||||||
|
|
||||||
import com.muyu.common.rabbit.config.RabbitmqConfig;
|
|
||||||
import com.muyu.cargateway.domain.VehicleConnection;
|
|
||||||
import com.muyu.cargateway.domain.VinIp;
|
|
||||||
import com.muyu.cargateway.domain.model.MqttServerModel;
|
|
||||||
import com.muyu.cargateway.domain.req.VehicleConnectionReq;
|
|
||||||
import com.muyu.cargateway.mapper.CarOneClickOperationMapper;
|
|
||||||
import com.muyu.cargateway.service.CarOneClickOperationService;
|
|
||||||
import com.muyu.common.redis.service.RedisService;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @ Tool:IntelliJ IDEA
|
|
||||||
* @ Author:CHX
|
|
||||||
* @ Date:2024-09-26-20:16
|
|
||||||
* @ Version:1.0
|
|
||||||
* @ Description:车辆一键操作业务实现层
|
|
||||||
* @author Lenovo
|
|
||||||
*/
|
|
||||||
@Log4j2
|
|
||||||
@Service
|
|
||||||
public class CarOneClickOperationServiceImpl implements CarOneClickOperationService {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private CarOneClickOperationMapper carOneClickOperationMapper;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RabbitTemplate rabbitTemplate;
|
|
||||||
@Autowired
|
|
||||||
private RedisService redisService;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取连接信息
|
|
||||||
* @param vehicleConnectionReq 车辆连接请求参数
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public MqttServerModel getConnect(VehicleConnectionReq vehicleConnectionReq) {
|
|
||||||
log.info("车辆连接请求:{}",vehicleConnectionReq.toString());
|
|
||||||
|
|
||||||
// 使用交换机发送消息
|
|
||||||
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,RabbitmqConfig.ROUTINGKEY_EMAIL,vehicleConnectionReq.getVehicleVin());
|
|
||||||
log.info("发送消息成功:{}",vehicleConnectionReq.getVehicleVin());
|
|
||||||
|
|
||||||
|
|
||||||
VehicleConnection vehicleConnection = new VehicleConnection();
|
|
||||||
//车辆vin
|
|
||||||
vehicleConnection.setVehicleVin(vehicleConnectionReq.getVehicleVin());
|
|
||||||
//用户名
|
|
||||||
vehicleConnection.setUsername(vehicleConnectionReq.getUsername());
|
|
||||||
//密码(vin+时间戳+随机数)
|
|
||||||
vehicleConnection.setPassword(vehicleConnectionReq.getVehicleVin()+vehicleConnectionReq.getTimestamp()+vehicleConnectionReq.getNonce());
|
|
||||||
//查询车辆vin集合
|
|
||||||
List<String> vehicleConnections =carOneClickOperationMapper.selectByVehicleVin(vehicleConnectionReq.getVehicleVin());
|
|
||||||
if(vehicleConnections.isEmpty()){
|
|
||||||
//添加
|
|
||||||
carOneClickOperationMapper.addConnect(vehicleConnection);
|
|
||||||
}
|
|
||||||
log.info("该车辆已存在,不能重复预上线");
|
|
||||||
//TODO 返回连接信息 做轮询操作
|
|
||||||
|
|
||||||
|
|
||||||
return new MqttServerModel("tcp://"+"106.15.136.7"+":1883","vehicle");
|
|
||||||
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* 添加车辆绑定IP地址存入redis中
|
|
||||||
*/
|
|
||||||
public void addIpAddress(VinIp vinIp){
|
|
||||||
redisService.setCacheObject("vehicle_ip_address:"+vinIp.getVehicleVin(),vinIp.getIpAddress());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,135 +0,0 @@
|
||||||
package com.muyu.cargateway.utils;
|
|
||||||
|
|
||||||
import com.aliyun.ecs20140526.Client;
|
|
||||||
import com.aliyun.ecs20140526.models.DeleteInstanceRequest;
|
|
||||||
import com.aliyun.ecs20140526.models.DescribeInstancesRequest;
|
|
||||||
import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
|
|
||||||
import com.aliyun.ecs20140526.models.RunInstancesRequest;
|
|
||||||
import com.aliyun.tea.TeaException;
|
|
||||||
import com.aliyun.teaopenapi.models.Config;
|
|
||||||
import com.aliyun.teautil.Common;
|
|
||||||
import com.aliyun.teautil.models.RuntimeOptions;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @ Tool:IntelliJ IDEA
|
|
||||||
* @ Author:CHX
|
|
||||||
* @ Date:2024-10-02-16:04
|
|
||||||
* @ Version:1.0
|
|
||||||
* @ Description:ecs实例工具类
|
|
||||||
* @author Lenovo
|
|
||||||
*/
|
|
||||||
@Log4j2
|
|
||||||
public class ECSTool {
|
|
||||||
|
|
||||||
public static final String ACCESS_KEY_ID = "LTAI5tDH3FyRx4PRr6anx2TL";
|
|
||||||
public static final String ACCESS_KEY_SECRET = "xdQnX2tDattY50raNkUWmHzE2tondP";
|
|
||||||
|
|
||||||
public static Client createClient() throws Exception {
|
|
||||||
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
|
|
||||||
Config config = new Config()
|
|
||||||
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
|
|
||||||
.setAccessKeyId(ACCESS_KEY_ID)
|
|
||||||
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
|
|
||||||
.setAccessKeySecret(ACCESS_KEY_SECRET);
|
|
||||||
// Endpoint 请参考 https://api.aliyun.com/product/Ecs
|
|
||||||
config.endpoint = "ecs-cn-hangzhou.aliyuncs.com";
|
|
||||||
return new Client(config);
|
|
||||||
}
|
|
||||||
public static void runEcsInstance(String regionId, String launchTemplateId) throws Exception {
|
|
||||||
Client client = ECSTool.createClient();
|
|
||||||
RunInstancesRequest request = new RunInstancesRequest();
|
|
||||||
request.setRegionId(regionId)
|
|
||||||
.setLaunchTemplateId(launchTemplateId);
|
|
||||||
RuntimeOptions runtimeOptions = new RuntimeOptions();
|
|
||||||
try{
|
|
||||||
client.runInstancesWithOptions(request, runtimeOptions);
|
|
||||||
}catch (Exception error){
|
|
||||||
// 处理API调用过程中出现的异常
|
|
||||||
System.out.println(error.getMessage());
|
|
||||||
if (error instanceof TeaException) {
|
|
||||||
// 处理特定类型的异常,如TeaException
|
|
||||||
TeaException teaError = (TeaException) error;
|
|
||||||
// 打印诊断推荐链接
|
|
||||||
System.out.println(teaError.getData().get("Recommend"));
|
|
||||||
// 断言错误信息
|
|
||||||
Common.assertAsString(teaError.getMessage());
|
|
||||||
} else {
|
|
||||||
// 处理其他类型的异常
|
|
||||||
System.out.println(error.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
*销毁实例
|
|
||||||
*/
|
|
||||||
public static void runEcsRemove(String instanceId) throws Exception {
|
|
||||||
Client client = ECSTool.createClient();
|
|
||||||
DeleteInstanceRequest deleteInstancesRequest = new DeleteInstanceRequest();
|
|
||||||
deleteInstancesRequest.setInstanceId(instanceId);
|
|
||||||
RuntimeOptions runtimeOptions = new RuntimeOptions();
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 复制代码运行请自行打印 API 的返回值
|
|
||||||
client.deleteInstanceWithOptions(deleteInstancesRequest, runtimeOptions);
|
|
||||||
} catch (TeaException error) {
|
|
||||||
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
|
|
||||||
// 错误 message
|
|
||||||
System.out.println(error.getMessage());
|
|
||||||
// 诊断地址
|
|
||||||
System.out.println(error.getData().get("Recommend"));
|
|
||||||
Common.assertAsString(error.message);
|
|
||||||
} catch (Exception _error) {
|
|
||||||
TeaException error = new TeaException(_error.getMessage(), _error);
|
|
||||||
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
|
|
||||||
// 错误 message
|
|
||||||
System.out.println(error.getMessage());
|
|
||||||
// 诊断地址
|
|
||||||
System.out.println(error.getData().get("Recommend"));
|
|
||||||
Common.assertAsString(error.message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 查询实例列表
|
|
||||||
* @param regionId 地域ID
|
|
||||||
*/
|
|
||||||
public static List<String> findInstance(String regionId) throws Exception {
|
|
||||||
Client client = ECSTool.createClient();
|
|
||||||
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest();
|
|
||||||
describeInstancesRequest.setRegionId(regionId);
|
|
||||||
RuntimeOptions runtimeOptions = new RuntimeOptions();
|
|
||||||
List<String> stringArrayList = new ArrayList<>();
|
|
||||||
try {
|
|
||||||
DescribeInstancesResponse response = client.describeInstancesWithOptions(describeInstancesRequest, runtimeOptions);
|
|
||||||
List<List<String>> ipListList = response.getBody().instances.getInstance().stream().map(instance -> instance.publicIpAddress.ipAddress).collect(Collectors.toList());
|
|
||||||
for (List<String> strings : ipListList) {
|
|
||||||
for (String ip : strings) {
|
|
||||||
stringArrayList.add(ip);
|
|
||||||
}
|
|
||||||
return stringArrayList;
|
|
||||||
}
|
|
||||||
} catch (TeaException error) {
|
|
||||||
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
|
|
||||||
// 错误 message
|
|
||||||
System.out.println(error.getMessage());
|
|
||||||
// 诊断地址
|
|
||||||
System.out.println(error.getData().get("Recommend"));
|
|
||||||
Common.assertAsString(error.message);
|
|
||||||
} catch (Exception _error) {
|
|
||||||
TeaException error = new TeaException(_error.getMessage(), _error);
|
|
||||||
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
|
|
||||||
// 错误 message
|
|
||||||
System.out.println(error.getMessage());
|
|
||||||
// 诊断地址
|
|
||||||
System.out.println(error.getData().get("Recommend"));
|
|
||||||
Common.assertAsString(error.message);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,13 +1,13 @@
|
||||||
# Tomcat
|
# Tomcat
|
||||||
server:
|
server:
|
||||||
port: 12900
|
port: 10099
|
||||||
|
|
||||||
# nacos线上地址
|
# nacos线上地址
|
||||||
nacos:
|
nacos:
|
||||||
addr: 47.116.173.119:8848
|
addr: 47.116.173.119:8848
|
||||||
user-name: nacos
|
user-name: nacos
|
||||||
password: nacos
|
password: nacos
|
||||||
namespace: oneone
|
namespace: one
|
||||||
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
|
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
|
||||||
# Spring
|
# Spring
|
||||||
spring:
|
spring:
|
||||||
|
@ -82,4 +82,4 @@ aliyun:
|
||||||
instance-type: ecs.t6-c1m1.large
|
instance-type: ecs.t6-c1m1.large
|
||||||
security-group-id: sg-uf642d5u4ja5gsiitx8y
|
security-group-id: sg-uf642d5u4ja5gsiitx8y
|
||||||
switch-id: vsw-uf66lifrkhxqc94xi06v3
|
switch-id: vsw-uf66lifrkhxqc94xi06v3
|
||||||
amount: 1
|
amount: 2
|
||||||
|
|
|
@ -1,18 +1,26 @@
|
||||||
<?xml version="1.0" encoding="UTF-8" ?>
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
<!DOCTYPE mapper
|
<!DOCTYPE mapper
|
||||||
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||||
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
<mapper namespace="com.muyu.cargateway.mapper.CarOneClickOperationMapper">
|
<mapper namespace="com.muyu.car.gateway.mapper.CarOneClickOperationMapper">
|
||||||
|
|
||||||
|
|
||||||
<insert id="addConnect">
|
<insert id="addConnect">
|
||||||
insert into car_one_click_operation
|
insert into car_one_click_operation
|
||||||
(vehicle_vin,user_name,password)
|
(vin, user_name, password)
|
||||||
values
|
values (#{vehicleVin}, #{username}, #{password})
|
||||||
(#{vehicleVin},#{username},#{password})
|
|
||||||
</insert>
|
</insert>
|
||||||
<select id="selectByVehicleVin" resultType="java.lang.String">
|
<select id="selectByVehicleVin" resultType="java.lang.String">
|
||||||
select vehicle_vin from car_one_click_operation where vehicle_vin = #{vehicleVin}
|
select vin
|
||||||
|
from car_one_click_operation
|
||||||
|
where vin = #{vehicleVin}
|
||||||
|
</select>
|
||||||
|
<select id="getMqttServerModel" resultType="com.muyu.car.gateway.domain.VehicleConnection">
|
||||||
|
select vin vehicleVin,
|
||||||
|
user_name,
|
||||||
|
password
|
||||||
|
from car_one_click_operation
|
||||||
|
where vin = #{vehicleVin}
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,131 +0,0 @@
|
||||||
package com.muyu.parsing.consumer;
|
|
||||||
|
|
||||||
import com.muyu.cargateway.domain.properties.MqttProperties;
|
|
||||||
import com.muyu.common.kafka.constants.KafkaConstants;
|
|
||||||
import com.muyu.parsing.domain.KafKaData;
|
|
||||||
import com.muyu.parsing.domain.SysCarMessage;
|
|
||||||
import com.muyu.parsing.manager.TaskManager;
|
|
||||||
import com.muyu.parsing.service.impl.SysCarMessageServiceImpl;
|
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import jakarta.annotation.Resource;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.eclipse.paho.client.mqttv3.*;
|
|
||||||
import org.springframework.amqp.core.Message;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @ClassName FormMessageConsumer
|
|
||||||
* @Description 描述
|
|
||||||
* @Author Chen
|
|
||||||
* @Date 2024/10/9 15:55
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
public class FormMessageConsumer {
|
|
||||||
@Resource
|
|
||||||
private RedisTemplate<String, String> redisTemplate;
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private KafkaProducer<String, String> kafkaProducer;
|
|
||||||
@Resource
|
|
||||||
private SysCarMessageServiceImpl sysCarMessageService;
|
|
||||||
private final static String FORM_QUEUE = "queue_inform_sms";
|
|
||||||
|
|
||||||
@RabbitListener(queuesToDeclare = {@Queue(FORM_QUEUE)})
|
|
||||||
public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
|
|
||||||
log.info("基础信息 {} 。。。", mqttProperties);
|
|
||||||
try {
|
|
||||||
// 第三个参数为空,默认持久化策略
|
|
||||||
MqttClient sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
|
|
||||||
MqttConnectOptions connOpts = new MqttConnectOptions();
|
|
||||||
//用户名
|
|
||||||
connOpts.setUserName(mqttProperties.getUserName());
|
|
||||||
//密码
|
|
||||||
connOpts.setPassword(mqttProperties.getPassword().toCharArray());
|
|
||||||
connOpts.setCleanSession(true);
|
|
||||||
System.out.println("Connecting to broker: " + mqttProperties.getBroker());
|
|
||||||
sampleClient.connect(connOpts);
|
|
||||||
sampleClient.subscribe(mqttProperties.getTopic(), 0);
|
|
||||||
sampleClient.setCallback(new MqttCallback() {
|
|
||||||
// 连接丢失
|
|
||||||
@Override
|
|
||||||
public void connectionLost(Throwable throwable) {
|
|
||||||
log.error("连接丢失:{}", throwable.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
// 连接成功
|
|
||||||
@Override
|
|
||||||
public void messageArrived(String s, MqttMessage mqttMessage) {
|
|
||||||
TaskManager manager = new TaskManager(10);
|
|
||||||
manager.execute(() -> processMessage(mqttMessage));
|
|
||||||
}
|
|
||||||
|
|
||||||
// 接收信息
|
|
||||||
@Override
|
|
||||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
||||||
log.info("接收消息:{}", iMqttDeliveryToken);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
||||||
} catch (MqttException | IOException me) {
|
|
||||||
// System.out.println("reason " + me.getReasonCode());
|
|
||||||
System.out.println("msg " + me.getMessage());
|
|
||||||
System.out.println("loc " + me.getLocalizedMessage());
|
|
||||||
System.out.println("cause " + me.getCause());
|
|
||||||
System.out.println("excep " + me);
|
|
||||||
me.printStackTrace();
|
|
||||||
throw new RuntimeException(me);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void processMessage(MqttMessage mqttMessage) {
|
|
||||||
String string = new String(mqttMessage.getPayload());
|
|
||||||
log.info(string);
|
|
||||||
List<SysCarMessage> list = sysCarMessageService.selectSysCarMessageLists(1);
|
|
||||||
List<KafKaData> kafKaDataList = new ArrayList<>();
|
|
||||||
String[] test = string.split(" ");
|
|
||||||
for (SysCarMessage carMessage : list) {
|
|
||||||
int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
|
|
||||||
int end = Integer.parseInt(carMessage.getMessageEndIndex());
|
|
||||||
StringBuilder hexBuilder = new StringBuilder();
|
|
||||||
for (int i = start; i < end; i++) {
|
|
||||||
hexBuilder.append(test[i]);
|
|
||||||
}
|
|
||||||
String hex = hexBuilder.toString();
|
|
||||||
char[] result = new char[hex.length() / 2];
|
|
||||||
for (int x = 0; x < hex.length(); x += 2) {
|
|
||||||
int high = Character.digit(hex.charAt(x), 16);
|
|
||||||
int low = Character.digit(hex.charAt(x + 1), 16);
|
|
||||||
result[x / 2] = (char) ((high << 4) + low);
|
|
||||||
}
|
|
||||||
String value = new String(result);
|
|
||||||
kafKaDataList.add(KafKaData.builder()
|
|
||||||
.key(carMessage.getMessageTypeCode())
|
|
||||||
.label(carMessage.getMessageTypeCode())
|
|
||||||
.value(value)
|
|
||||||
.type(carMessage.getMessageType())
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
kafKaDataList.add(KafKaData.builder()
|
|
||||||
.key("firmCode")
|
|
||||||
.label("企业编码")
|
|
||||||
.value("firm01")
|
|
||||||
.type("String")
|
|
||||||
.build());
|
|
||||||
|
|
||||||
String jsonString = com.alibaba.fastjson.JSONObject.toJSONString(kafKaDataList);
|
|
||||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
|
|
||||||
kafkaProducer.send(producerRecord);
|
|
||||||
log.info("kafka投产:{}", jsonString);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package com.muyu.parsing.consumer;
|
||||||
|
|
||||||
|
import com.muyu.cargateway.domain.properties.MqttProperties;
|
||||||
|
import com.muyu.common.rabbit.constants.RabbitConstants;
|
||||||
|
import com.muyu.parsing.mqtt.service.MqttClientService;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class RabbitListenerComponent {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MqttClientService mqttClientService;
|
||||||
|
|
||||||
|
@RabbitListener(queuesToDeclare = @Queue(value = RabbitConstants.FORM_QUEUE, durable = "true"))
|
||||||
|
public void downline(MqttProperties mqttProperties, Message message, Channel channel) {
|
||||||
|
try {
|
||||||
|
mqttClientService.connectAndSubscribeAsync(mqttProperties);
|
||||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
log.error("处理RabbitMQ消息时发生错误", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,76 @@
|
||||||
|
package com.muyu.parsing.manager;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.muyu.common.kafka.constants.KafkaConstants;
|
||||||
|
import com.muyu.parsing.domain.KafKaData;
|
||||||
|
import com.muyu.parsing.domain.SysCarMessage;
|
||||||
|
import com.muyu.parsing.service.impl.SysCarMessageServiceImpl;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class MessageProcessor {
|
||||||
|
|
||||||
|
|
||||||
|
private static final int ID = 1;
|
||||||
|
@Resource
|
||||||
|
private SysCarMessageServiceImpl sysCarMessageService;
|
||||||
|
@Resource
|
||||||
|
private KafkaProducer<String, String> kafkaProducer;
|
||||||
|
|
||||||
|
|
||||||
|
public void processMessage(MqttMessage mqttMessage) {
|
||||||
|
String payload = new String(mqttMessage.getPayload());
|
||||||
|
log.info("====:{}", payload);
|
||||||
|
|
||||||
|
List<SysCarMessage> carMessages = sysCarMessageService.selectSysCarMessageLists(ID);
|
||||||
|
List<KafKaData> kafKaDataList = new ArrayList<>();
|
||||||
|
|
||||||
|
String[] test = payload.split(" ");
|
||||||
|
for (SysCarMessage carMessage : carMessages) {
|
||||||
|
int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
|
||||||
|
int end = Integer.parseInt(carMessage.getMessageEndIndex());
|
||||||
|
|
||||||
|
StringBuilder hexBuilder = new StringBuilder();
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
hexBuilder.append(test[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
String hex = hexBuilder.toString();
|
||||||
|
char[] result = new char[hex.length() / 2];
|
||||||
|
for (int x = 0; x < hex.length(); x += 2) {
|
||||||
|
int high = Character.digit(hex.charAt(x), 16);
|
||||||
|
int low = Character.digit(hex.charAt(x + 1), 16);
|
||||||
|
result[x / 2] = (char) ((high << 4) + low);
|
||||||
|
}
|
||||||
|
|
||||||
|
String value = new String(result);
|
||||||
|
kafKaDataList.add(KafKaData.builder()
|
||||||
|
.key(carMessage.getMessageTypeCode())
|
||||||
|
.label(carMessage.getMessageTypeCode())
|
||||||
|
.value(value)
|
||||||
|
.type(carMessage.getMessageType())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
kafKaDataList.add(KafKaData.builder()
|
||||||
|
.key("firmCode")
|
||||||
|
.label("企业编码")
|
||||||
|
.value("firm01")
|
||||||
|
.type("String")
|
||||||
|
.build());
|
||||||
|
String jsonString = JSONObject.toJSONString(kafKaDataList);
|
||||||
|
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
|
||||||
|
kafkaProducer.send(producerRecord);
|
||||||
|
log.info("kafka投产:{}", jsonString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,24 +0,0 @@
|
||||||
package com.muyu.parsing.manager;
|
|
||||||
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 线程配置
|
|
||||||
*/
|
|
||||||
public final class TaskManager {
|
|
||||||
private final ExecutorService executorService;
|
|
||||||
|
|
||||||
public TaskManager(int threadPoolSize) {
|
|
||||||
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void execute(Runnable task) {
|
|
||||||
executorService.submit(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 关闭线程池
|
|
||||||
public void shutdown() {
|
|
||||||
executorService.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,189 +0,0 @@
|
||||||
//package com.muyu.parsing.manager;
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * @Author: 胡杨
|
|
||||||
// * @Name: TaskManager
|
|
||||||
// * @Description: 任务管理器
|
|
||||||
// * @CreatedDate: 2024/9/4 下午7:44
|
|
||||||
// * @FilePath: com.muyu.quest.manager
|
|
||||||
// */
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//import lombok.extern.slf4j.Slf4j;
|
|
||||||
//
|
|
||||||
//import java.util.Collections;
|
|
||||||
//import java.util.LinkedList;
|
|
||||||
//import java.util.List;
|
|
||||||
//
|
|
||||||
//import static java.lang.Thread.sleep;
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * 基础线程配置
|
|
||||||
// */
|
|
||||||
//@Slf4j
|
|
||||||
//public final class TaskManagers {
|
|
||||||
// // 线程池中默认线程的个数为5
|
|
||||||
// private static int workerNum = 8;
|
|
||||||
// // 工作线程
|
|
||||||
//// private final WorkThread[] workThrads;
|
|
||||||
// // 未处理的任务
|
|
||||||
// private static volatile int finishedTask = 0;
|
|
||||||
////
|
|
||||||
//// // 任务队列
|
|
||||||
//// private final List<Runnable> taskQueue = new LinkedList<Runnable>();
|
|
||||||
//// private static TaskManager taskManager;
|
|
||||||
////
|
|
||||||
//// private long startTime;
|
|
||||||
//// private long endTime;
|
|
||||||
////
|
|
||||||
//// // 创建具有默认线程个数的线程池
|
|
||||||
//// private TaskManager() {
|
|
||||||
//// this(8);
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 创建线程池,workerNum为线程池中工作线程的个数
|
|
||||||
//// private TaskManager(int workerNum) {
|
|
||||||
//// taskManager.workerNum = workerNum;
|
|
||||||
//// workThrads = new WorkThread[workerNum];
|
|
||||||
//// for (int i = 0; i < workerNum; i++) {
|
|
||||||
//// workThrads[i] = new WorkThread();
|
|
||||||
//// workThrads[i].start();// 开启线程池中的线程
|
|
||||||
//// }
|
|
||||||
//// startTime = System.currentTimeMillis();
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 单态模式,获得一个默认线程个数的线程池
|
|
||||||
//// public static TaskManager getTaskManager() {
|
|
||||||
//// return getTaskManager(TaskManager.workerNum);
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 单态模式,获得一个指定线程个数的线程池,workerNum(>0)为线程池中工作线程的个数
|
|
||||||
//// // workerNum<=0创建默认的工作线程个数
|
|
||||||
//// public static TaskManager getTaskManager(int workerNum1) {
|
|
||||||
//// if (workerNum1 <= 0) {
|
|
||||||
//// workerNum1 = TaskManager.workerNum;
|
|
||||||
//// }
|
|
||||||
//// if (taskManager == null) {
|
|
||||||
//// taskManager = new TaskManager(workerNum1);
|
|
||||||
//// }
|
|
||||||
//// return taskManager;
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 把任务加入任务队列
|
|
||||||
//// public void execute(List<Runnable> task) {
|
|
||||||
//// execute(task.toArray(new Runnable[0]));
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 把任务加入任务队列
|
|
||||||
//// public void execute(Runnable... task) {
|
|
||||||
//// synchronized (taskQueue) {
|
|
||||||
//// Collections.addAll(taskQueue, task);
|
|
||||||
//// taskQueue.notify();
|
|
||||||
//// }
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
////
|
|
||||||
//// // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
|
|
||||||
//// public void destroy() {
|
|
||||||
//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧
|
|
||||||
//// try {
|
|
||||||
//// sleep(500);
|
|
||||||
//// } catch (InterruptedException e) {
|
|
||||||
//// e.printStackTrace();
|
|
||||||
//// }
|
|
||||||
//// }
|
|
||||||
//// // 工作线程停止工作,且置为null
|
|
||||||
//// for (int i = 0; i < workerNum; i++) {
|
|
||||||
//// workThrads[i].stopWorker();
|
|
||||||
//// workThrads[i] = null;
|
|
||||||
//// }
|
|
||||||
//// taskManager = null;
|
|
||||||
//// taskQueue.clear();// 清空任务队列
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 任务完成后,统计线程池的运行情况
|
|
||||||
//// public void closed() {
|
|
||||||
//// while (!getIsRunning()) {// 如果还有任务没执行完成,就先睡会吧
|
|
||||||
//// try {
|
|
||||||
//// sleep(500);
|
|
||||||
//// } catch (InterruptedException e) {
|
|
||||||
//// e.printStackTrace();
|
|
||||||
//// }
|
|
||||||
//// }
|
|
||||||
//// log.info("任务完成,线程状态: {}", this.toString());
|
|
||||||
//// taskManager = null;
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 返回工作线程的个数
|
|
||||||
//// public int getWorkThreadNumber() {
|
|
||||||
//// return workerNum;
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
|
|
||||||
//// public int getFinishedTasknumber() {
|
|
||||||
//// return finishedTask;
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 返回任务队列的长度,即还没处理的任务个数
|
|
||||||
//// public int getWaitTasknumber() {
|
|
||||||
//// return taskQueue.size();
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// /**
|
|
||||||
//// * 返回线程池当前状态
|
|
||||||
//// *
|
|
||||||
//// * @return 如果还在工作返回false,否则返回true
|
|
||||||
//// */
|
|
||||||
//// public Boolean getIsRunning() {
|
|
||||||
//// return taskQueue.isEmpty() || (taskManager.getWaitTasknumber() == 0 && taskManager.getWorkThreadNumber() == 0);
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
|
|
||||||
//// @Override
|
|
||||||
//// public String toString() {
|
|
||||||
//// endTime = System.currentTimeMillis();
|
|
||||||
//// return "工作任务数:" + workerNum + ",已完成任务数:"
|
|
||||||
//// + finishedTask + ",等待任务数:" + getWaitTasknumber()
|
|
||||||
//// + "线程池作时长:" + (endTime - startTime) + "ms";
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// /**
|
|
||||||
//// * 内部类,工作线程
|
|
||||||
//// */
|
|
||||||
//// private class WorkThread extends Thread {
|
|
||||||
//// // 该工作线程是否有效,用于结束该工作线程
|
|
||||||
//// private boolean isRunning = true;
|
|
||||||
////
|
|
||||||
//// /*
|
|
||||||
//// * 关键所在,如果任务队列不空,则取出任务执行,若任务队列空,则等待
|
|
||||||
//// */
|
|
||||||
//// @Override
|
|
||||||
//// public void run() {
|
|
||||||
//// Runnable r = null;
|
|
||||||
//// while (isRunning) {// 若线程无效则自然结束run方法,该线程就没用了
|
|
||||||
//// synchronized (taskQueue) {
|
|
||||||
//// while (isRunning && taskQueue.isEmpty()) {// 队列为空
|
|
||||||
//// try {
|
|
||||||
//// taskQueue.wait(50);
|
|
||||||
//// } catch (InterruptedException e) {
|
|
||||||
//// e.printStackTrace();
|
|
||||||
//// }
|
|
||||||
//// }
|
|
||||||
//// if (!taskQueue.isEmpty()) {
|
|
||||||
//// r = taskQueue.remove(0);// 取出任务
|
|
||||||
//// }
|
|
||||||
//// }
|
|
||||||
//// if (r != null) {
|
|
||||||
//// r.run();// 执行任务
|
|
||||||
//// }
|
|
||||||
//// finishedTask++;
|
|
||||||
//// r = null;
|
|
||||||
//// }
|
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// // 停止工作,让该线程自然执行完run方法,自然结束
|
|
||||||
//// public void stopWorker() {
|
|
||||||
//// isRunning = false;
|
|
||||||
//// }
|
|
||||||
//// }
|
|
||||||
//}
|
|
|
@ -1,127 +0,0 @@
|
||||||
package com.muyu.parsing.mqtt;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.muyu.common.kafka.constants.KafkaConstants;
|
|
||||||
import com.muyu.parsing.domain.KafKaData;
|
|
||||||
import com.muyu.parsing.domain.SysCarMessage;
|
|
||||||
import com.muyu.parsing.service.impl.SysCarMessageServiceImpl;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.eclipse.paho.client.mqttv3.*;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* mqtt
|
|
||||||
*
|
|
||||||
* @ClassName MqttTest
|
|
||||||
* @Description 描述
|
|
||||||
* @Author Chen
|
|
||||||
* @Date 2024/9/28 23:49
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
public class MqttTest {
|
|
||||||
private static final Integer ID = 1;
|
|
||||||
private static final Integer CODE = 1;
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private KafkaProducer<String, String> kafkaProducer;
|
|
||||||
@Resource
|
|
||||||
private SysCarMessageServiceImpl sysCarMessageService;
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void Test() {
|
|
||||||
String topic = "vehicle";
|
|
||||||
String content = "Message from MqttPublishSample";
|
|
||||||
int qos = 2;
|
|
||||||
String broker = "tcp://106.15.136.7:1883";
|
|
||||||
String clientId = "JavaSample";
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 第三个参数为空,默认持久化策略
|
|
||||||
MqttClient sampleClient = new MqttClient(broker, clientId);
|
|
||||||
MqttConnectOptions connOpts = new MqttConnectOptions();
|
|
||||||
connOpts.setCleanSession(true);
|
|
||||||
System.out.println("Connecting to broker: " + broker);
|
|
||||||
sampleClient.connect(connOpts);
|
|
||||||
sampleClient.subscribe(topic, 0);
|
|
||||||
sampleClient.setCallback(new MqttCallback() {
|
|
||||||
// 连接丢失
|
|
||||||
@Override
|
|
||||||
public void connectionLost(Throwable throwable) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 连接成功
|
|
||||||
@Override
|
|
||||||
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
||||||
List<SysCarMessage> list = sysCarMessageService.selectSysCarMessageLists(ID);
|
|
||||||
String string = new String(mqttMessage.getPayload());
|
|
||||||
log.info(new String(mqttMessage.getPayload()));
|
|
||||||
List<KafKaData> kafKaDataList = new ArrayList<>();
|
|
||||||
String[] test = string.split(" ");
|
|
||||||
// String[] results = new String[list.size()];
|
|
||||||
for (SysCarMessage carMessage : list) {
|
|
||||||
int start = Integer.parseInt(carMessage.getMessageStartIndex()) - 1;
|
|
||||||
int end = Integer.parseInt(carMessage.getMessageEndIndex());
|
|
||||||
StringBuilder hexBuilder = new StringBuilder();
|
|
||||||
for (int i = start; i < end; i++) {
|
|
||||||
hexBuilder.append(test[i]);
|
|
||||||
}
|
|
||||||
String hex = hexBuilder.toString();
|
|
||||||
char[] result = new char[hex.length() / 2];
|
|
||||||
for (int x = 0; x < hex.length(); x += 2) {
|
|
||||||
int high = Character.digit(hex.charAt(x), 16);
|
|
||||||
int low = Character.digit(hex.charAt(x + 1), 16);
|
|
||||||
result[x / 2] = (char) ((high << 4) + low);
|
|
||||||
}
|
|
||||||
String value = new String(result);
|
|
||||||
kafKaDataList.add(KafKaData.builder()
|
|
||||||
.key(carMessage.getMessageTypeCode())
|
|
||||||
.label(carMessage.getMessageTypeCode())
|
|
||||||
.value(value)
|
|
||||||
.type(carMessage.getMessageType())
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
kafKaDataList.add(KafKaData.builder()
|
|
||||||
.key("firmCode")
|
|
||||||
.label("企业编码")
|
|
||||||
.value("firm01")
|
|
||||||
.type("String")
|
|
||||||
.build());
|
|
||||||
String jsonString = JSONObject.toJSONString(kafKaDataList);
|
|
||||||
|
|
||||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
|
|
||||||
kafkaProducer.send(producerRecord);
|
|
||||||
log.info("kafka投产:{}", jsonString);
|
|
||||||
// HashMap<String, String> stringStringHashMap = new HashMap<>();
|
|
||||||
// kafKaDataList.forEach(data -> stringStringHashMap.put(data.getKey(), data.getValue()));
|
|
||||||
// jsonString = JSONObject.toJSONString(stringStringHashMap);
|
|
||||||
// System.out.println(jsonString);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// 接收信息
|
|
||||||
@Override
|
|
||||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
} catch (MqttException me) {
|
|
||||||
System.out.println("reason " + me.getReasonCode());
|
|
||||||
System.out.println("msg " + me.getMessage());
|
|
||||||
System.out.println("loc " + me.getLocalizedMessage());
|
|
||||||
System.out.println("cause " + me.getCause());
|
|
||||||
System.out.println("excep " + me);
|
|
||||||
me.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
package com.muyu.parsing.mqtt.service;
|
||||||
|
|
||||||
|
import com.muyu.cargateway.domain.properties.MqttProperties;
|
||||||
|
import com.muyu.parsing.manager.MessageProcessor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class MqttClientService {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MessageProcessor messageProcessor;
|
||||||
|
|
||||||
|
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
|
private MqttClient sampleClient;
|
||||||
|
|
||||||
|
|
||||||
|
public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
|
||||||
|
executorService.submit(() -> {
|
||||||
|
try {
|
||||||
|
connectAndSubscribe(mqttProperties);
|
||||||
|
} catch (MqttException | IOException e) {
|
||||||
|
log.error("MQTT连接或订阅失败", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void connectAndSubscribe(MqttProperties mqttProperties) throws MqttException, IOException {
|
||||||
|
// if (sampleClient != null && sampleClient.isConnected()) {
|
||||||
|
// log.info("MQTT客户端已经连接,跳过重新连接。");
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
|
||||||
|
sampleClient = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId());
|
||||||
|
MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||||
|
connOpts.setUserName(mqttProperties.getUserName());
|
||||||
|
connOpts.setPassword(mqttProperties.getPassword().toCharArray());
|
||||||
|
connOpts.setCleanSession(true);
|
||||||
|
|
||||||
|
sampleClient.connect(connOpts);
|
||||||
|
sampleClient.subscribe(mqttProperties.getTopic(), 0);
|
||||||
|
|
||||||
|
sampleClient.setCallback(new MqttCallback() {
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable throwable) {
|
||||||
|
log.error("连接丢失:{}", throwable.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String s, MqttMessage mqttMessage) {
|
||||||
|
executorService.submit(() -> messageProcessor.processMessage(mqttMessage));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||||
|
log.info("消息发送完成:{}", iMqttDeliveryToken);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void shutdown() {
|
||||||
|
executorService.shutdown();
|
||||||
|
if (sampleClient != null && sampleClient.isConnected()) {
|
||||||
|
try {
|
||||||
|
sampleClient.disconnect();
|
||||||
|
} catch (MqttException e) {
|
||||||
|
log.error("MQTT客户端断开连接失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue