diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java index b4c997b..ef967fa 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/CarDataApplication.java @@ -1,9 +1,11 @@ package com.muyu.carData; -import com.muyu.carData.listener.MyListener; +import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableMyFeignClients; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.scheduling.annotation.EnableAsync; /** * @Author:张腾 @@ -14,11 +16,12 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; */ @SpringBootApplication @EnableMyFeignClients +@EnableCustomConfig +@EnableAsync +@EnableCaching public class CarDataApplication { public static void main(String[] args) { - SpringApplication application = new SpringApplication(CarDataApplication.class); - application.addListeners(new MyListener()); - application.run(args); + SpringApplication.run(CarDataApplication.class,args); } } diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java index 9561bb3..989b907 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java @@ -1,15 +1,22 @@ package com.muyu.carData.config.lotdbconfig; import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -76,6 +83,66 @@ public class IotDBSessionConfig { } + /** + * 添加操作 + * @param deviceId + * @param time 时间戳 + * @param measurements + * @param values 值 + */ + public void execute(String deviceId,long time,List measurements,List values){ + if (CollectionUtils.isEmpty(measurements) && !CollectionUtils.isEmpty(values)){ + try { + iotSession().insertAlignedRecord(deviceId,time,measurements,values); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } + } + } + + /** + * 查询操作 + * @param sql + * @return + */ + public List> executeQuery(String sql){ + logger.info("sql:{}",sql); + List> list = new ArrayList<>(); + + try { + SessionDataSet sessionDataSet = iotSession().executeQueryStatement(sql); + int fetchSize = sessionDataSet.getFetchSize(); + List columnNames = sessionDataSet.getColumnNames(); + logger.info("columnNames:{}",columnNames); + List columnTypes = sessionDataSet.getColumnTypes(); + logger.info("columnTypes:{}",columnTypes); + if (fetchSize > 0){ + while (sessionDataSet.hasNext()){ + HashMap map = new HashMap<>(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + String key = field.getStringValue(); + Object value = field.getObjectValue(field.getDataType()); + map.put(key,value); + } + list.add(map); + } + + sessionDataSet.closeOperationHandle(); + } + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + + return list; + } + diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOffConsumer.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOffConsumer.java new file mode 100644 index 0000000..77a77b7 --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOffConsumer.java @@ -0,0 +1,34 @@ +package com.muyu.carData.consumer; + +import com.muyu.carData.util.CacheUtil; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** 车辆下线消费者 + * @Author:张腾 + * @Package:com.muyu.carData.consumer + * @Project:cloud-server-8 + * @name:CarOffConsumer + * @Date:2024/10/4 14:30 + */ +@Log4j2 +@Component +public class CarOffConsumer { + + @Autowired + private CacheUtil cacheUtil; + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "CAR_OFFLINE",durable = "true"), + exchange = @Exchange(value = "CAR_OFF_EXCHANGE",type = "fanout") + )) + public void inline(String vin){ + log.info("车辆vin:{},车辆下线成功!开始消费..."); + cacheUtil.remove(vin); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOnlineConsumer.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOnlineConsumer.java new file mode 100644 index 0000000..b27827b --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/CarOnlineConsumer.java @@ -0,0 +1,33 @@ +package com.muyu.carData.consumer; + +import com.muyu.carData.util.CacheUtil; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/**车辆上线消费者 + * @Author:张腾 + * @Package:com.muyu.carData.consumer + * @Project:cloud-server-8 + * @name:CarOnlineConsumer + * @Date:2024/10/4 14:27 + */ +@Log4j2 +@Component +public class CarOnlineConsumer { + + @Autowired + private CacheUtil cacheUtil; + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "CAR_ONLINE",durable = "true"), + exchange = @Exchange(value = "ONLINE_EXCHANGE",type = "fanout") + )) + public void online(String vin){ + log.info("车辆vin:{},已上线,开始消费",vin); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java index 8d0aecc..c7e592c 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/consumer/MyKafkaConsumer.java @@ -1,19 +1,23 @@ package com.muyu.carData.consumer; -import cn.hutool.core.thread.ThreadUtil; -import cn.hutool.json.JSONUtil; -import com.alibaba.nacos.shaded.com.google.common.collect.Lists; -import com.muyu.carData.pojo.Student; +import com.alibaba.fastjson.JSONObject; +import com.muyu.carData.event.EventPublisher; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; import java.time.Duration; -import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /**卡夫卡消费者 * @Author:张腾 @@ -24,33 +28,43 @@ import java.util.Collection; */ @Component @Log4j2 -public class MyKafkaConsumer implements InitializingBean { +public class MyKafkaConsumer implements ApplicationRunner, ApplicationListener { @Autowired private KafkaConsumer kafkaConsumer; + @Autowired + private EventPublisher eventPublisher; + private final String topicName = "carJsons"; + private final ExecutorService executorService = Executors.newFixedThreadPool(10); + @Override - public void afterPropertiesSet() throws Exception { - log.info("启动线程开始监听topic:{}",topicName); - Thread thread = new Thread(() -> { - ThreadUtil.sleep(1000); - Collection topics = Lists.newArrayList(topicName); - kafkaConsumer.subscribe(topics); - while (true){ - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord consumerRecord : consumerRecords) { - //从consumerRecord中获取消费数据 - String value = consumerRecord.value(); - log.info("从Kafka中消费的原始数据===============>>:{}",value); - } - } - }); - thread.start(); + public void run(ApplicationArguments args) throws Exception { + log.info("开始监听kafka-topic:{}",topicName); + List topics = Collections.singletonList(topicName); + kafkaConsumer.subscribe(topics); - log.info("启动线程结束监听topic:{}",topicName); + while (true){ + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); + consumerRecords.forEach(record ->{ + executorService.submit(() -> handleRecord(record)); + log.info("数据为:{},消费成功!",record); + }); + } + } + private void handleRecord(ConsumerRecord record) { + String value = record.value(); + JSONObject jsonObject = JSONObject.parseObject(value); + eventPublisher.publishEvent(jsonObject); + } + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("关闭kafka和线程"); + kafkaConsumer.close(); + executorService.shutdown(); } } diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java index f2d2c53..bc94adc 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/CacheController.java @@ -1,3 +1,4 @@ +/* package com.muyu.carData.controller; import com.github.benmanes.caffeine.cache.Cache; @@ -8,13 +9,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +*/ /** * @Author:张腾 * @Package:com.muyu.carData.controller * @Project:cloud-server-8 * @name:TestController * @Date:2024/9/26 23:56 - */ + *//* + @RestController @RequestMapping("/testCache") @Log4j2 @@ -42,3 +45,4 @@ public class CacheController { } } +*/ diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java index b167ef7..a0f5bcc 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/controller/KafkaProducerController.java @@ -1,3 +1,4 @@ +/* package com.muyu.carData.controller; import com.alibaba.fastjson.JSONObject; @@ -9,13 +10,15 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +*/ /** * @Author:张腾 * @Package:com.muyu.carData.testcontroller * @Project:cloud-server-8 * @name:KafkaProducerController * @Date:2024/9/28 15:10 - */ + *//* + @RestController @RequestMapping("/produce") @Log4j2 @@ -43,3 +46,4 @@ public class KafkaProducerController { } +*/ diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java index f4d689f..fab5a76 100644 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EsSaveEvent.java @@ -15,8 +15,12 @@ public class EsSaveEvent extends ApplicationEvent { private JSONObject data; - public EsSaveEvent(JSONObject source) { + public EsSaveEvent(Object source,JSONObject data) { super(source); - this.data = source; + this.data = data; + } + + public JSONObject getData(){ + return data; } } diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventListener.java new file mode 100644 index 0000000..8bb30a0 --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventListener.java @@ -0,0 +1,15 @@ +package com.muyu.carData.event; + +import org.springframework.context.ApplicationListener; + +/** 事件监听接口 + * @Author:张腾 + * @Package:com.muyu.carData.event + * @Project:cloud-server-8 + * @name:EventListener + * @Date:2024/10/4 10:39 + */ +public interface EventListener extends ApplicationListener { + + void onEvent(EsSaveEvent event); +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventPublisher.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventPublisher.java new file mode 100644 index 0000000..e7bbc8c --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/event/EventPublisher.java @@ -0,0 +1,28 @@ +package com.muyu.carData.event; + +import com.alibaba.fastjson.JSONObject; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.stereotype.Component; + +/** 策略发送事件 + * @Author:张腾 + * @Package:com.muyu.carData.event + * @Project:cloud-server-8 + * @name:EventPublisher + * @Date:2024/10/4 10:40 + */ +@Component +public class EventPublisher implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher publisher; + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.publisher = applicationEventPublisher; + } + + public void publishEvent(JSONObject message){ + EsSaveEvent esSaveEvent = new EsSaveEvent(this, message); + publisher.publishEvent(esSaveEvent); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/CustomEventListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/CustomEventListener.java deleted file mode 100644 index 4d63af0..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/CustomEventListener.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.muyu.carData.listener; - -import com.muyu.carData.event.EsSaveEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - -/** - * @Author:张腾 - * @Package:com.muyu.carData.listener - * @Project:cloud-server-8 - * @name:CustomEventListener - * @Date:2024/9/29 23:49 - */ -@Component -public class CustomEventListener { - - @EventListener - public void handMyEvent(EsSaveEvent event){ - //处理事件详情 - - } -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/InsertIotDBListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/InsertIotDBListener.java new file mode 100644 index 0000000..a617dcb --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/InsertIotDBListener.java @@ -0,0 +1,45 @@ +package com.muyu.carData.listener; + +import com.alibaba.fastjson.JSONObject; +import com.muyu.carData.config.lotdbconfig.IotDBSessionConfig; +import com.muyu.carData.event.EsSaveEvent; +import com.muyu.carData.event.EventListener; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +/** 数据持久化事件 + * @Author:张腾 + * @Package:com.muyu.carData.listener + * @Project:cloud-server-8 + * @name:InsertIotDBListener + * @Date:2024/10/6 9:25 + */ +@Log4j2 +@Component +public class InsertIotDBListener implements EventListener { + @Override + public void onEvent(EsSaveEvent event) { + JSONObject jsonObject = event.getData(); + log.info("持久化:监听到数据:{}", jsonObject); + List keys = new ArrayList<>(); + List values = new ArrayList<>(); + + jsonObject.forEach((key,value) -> { + keys.add(key); + values.add((String) value); + }); + + IotDBSessionConfig iotDBSessionConfig = new IotDBSessionConfig(); + long time = System.currentTimeMillis(); + iotDBSessionConfig.execute("root.vehicle",time,keys,values); + log.info("数据写入成功"); + } + + @Override + public void onApplicationEvent(EsSaveEvent event) { + onEvent(event); + } +} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/MyListener.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/MyListener.java deleted file mode 100644 index 5bfcb26..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/listener/MyListener.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.muyu.carData.listener; - -import com.muyu.carData.event.EsSaveEvent; -import lombok.extern.log4j.Log4j2; -import org.springframework.context.ApplicationListener; - -/**自定义监听器 - * @Author:张腾 - * @Package:com.muyu.carData.listener - * @Project:cloud-server-8 - * @name:MyListener - * @Date:2024/9/29 21:18 - */ -@Log4j2 -public class MyListener implements ApplicationListener { - @Override - public void onApplicationEvent(EsSaveEvent event) { - log.info("监听到自定义事件........"); - } -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pojo/Student.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pojo/Student.java deleted file mode 100644 index 90d9937..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pojo/Student.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.muyu.carData.pojo; - -import com.muyu.carData.config.cacheconfig.ExpiryTime; -import lombok.*; -import lombok.experimental.SuperBuilder; - -/** - * @Author:张腾 - * @Package:com.muyu.carData.pojo - * @Project:cloud-server-8 - * @name:Student - * @Date:2024/9/27 0:40 - */ - -@Data -@SuperBuilder -@NoArgsConstructor -@AllArgsConstructor -@EqualsAndHashCode(callSuper = true) -public class Student extends ExpiryTime{ - - /** - * 主键 - */ - private Integer id; - - /** - * 姓名 - */ - private String name; - - /** - * 性别 - */ - private String sex; - - /** - * 插入时间 - */ - private long time = System.currentTimeMillis(); -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pulisher/CustomEventPublisher.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pulisher/CustomEventPublisher.java deleted file mode 100644 index 2a32391..0000000 --- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/pulisher/CustomEventPublisher.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.muyu.carData.pulisher; - -import com.alibaba.fastjson.JSONObject; -import com.muyu.carData.event.EsSaveEvent; -import lombok.AllArgsConstructor; -import lombok.extern.log4j.Log4j2; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.stereotype.Component; - -/**事件发布测试 - * @Author:张腾 - * @Package:com.muyu.carData.pulisher - * @Project:cloud-server-8 - * @name:CustomEventPublisher - * @Date:2024/9/29 23:51 - */ -@Log4j2 -@Component -@AllArgsConstructor -public class CustomEventPublisher { - - private ApplicationEventPublisher applicationEventPublisher; - - public void publish(JSONObject data){ - EsSaveEvent esSaveEvent = new EsSaveEvent(data); - applicationEventPublisher.publishEvent(esSaveEvent); - log.info("事件发布成功 - 消息是:{}",data); - } -} diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/util/CacheUtil.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/util/CacheUtil.java new file mode 100644 index 0000000..5501ba4 --- /dev/null +++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/util/CacheUtil.java @@ -0,0 +1,37 @@ +package com.muyu.carData.util; + + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.stereotype.Component; + +/** 缓存工具类 + * @Author:张腾 + * @Package:com.muyu.carData.util + * @Project:cloud-server-8 + * @name:CacheUtil + * @Date:2024/10/4 10:42 + */ +@Component +public class CacheUtil { + + private final Cache cache; + + public CacheUtil() { + this.cache = Caffeine.newBuilder() + .maximumSize(500L) + .build(); + } + + public T get(String key) { + return (T)cache.getIfPresent(key); + } + + public void put(String key, T value) { + cache.put(key, value); + } + + public void remove(String key) { + cache.invalidate(key); + } +} diff --git a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java index 1e49f43..f1147eb 100644 --- a/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java +++ b/cloud-modules/cloud-modules-protocolparsing/src/main/java/com/muyu/mqtt/MqttTest.java @@ -22,6 +22,8 @@ import java.util.List; * mqtt * * @ClassName MqttTest + * @Description + * @Date 2024/9/28 23:49 */ @Slf4j @Component