From 9bb5cf34354c4a33f83e8a281613a8577c40192d Mon Sep 17 00:00:00 2001 From: dongxiaodong <13970843+dxdwork@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 11:33:51 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD?= =?UTF-8?q?=E8=AF=AD=E5=8F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../couplet-analyze-msg/pom.xml | 32 ++++++++++++------- .../analyze/msg/CoupletMsgApplication.java | 7 ++++ .../analyze/msg/model/ModelsKafkaMessage.java | 7 ++-- .../src/main/resources/bootstrap.yml | 6 ---- .../com/couplet/online/utils/MqttMonitor.java | 2 +- .../src/main/resources/bootstrap.yml | 6 ++-- 6 files changed, 34 insertions(+), 26 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml index fd7ec92..00c4073 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml @@ -81,12 +81,12 @@ couplet-common-swagger - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.2.5 - + + + + + + com.couplet @@ -94,20 +94,28 @@ - + + + + + org.apache.kafka + kafka-clients + 2.8.0 + com.couplet couplet-common-business - - - org.springframework.boot - spring-boot-starter-amqp - + + + + + diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java index 43a0618..0cf32fb 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/CoupletMsgApplication.java @@ -1,5 +1,7 @@ package com.couplet.analyze.msg; +import com.couplet.analyze.msg.model.ModelsKafkaMessage; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; @@ -15,8 +17,13 @@ import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling @EnableFeignClients(basePackages = "com.couplet.**") public class CoupletMsgApplication { + + public static void main(String[] args) { SpringApplication.run(CoupletMsgApplication.class); System.out.println("解析系统启动成功"); +// new ModelsKafkaMessage().initKafkaConsumer(); } + + } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index d9782d4..93be367 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -1,5 +1,6 @@ package com.couplet.analyze.msg.model; + import com.couplet.analyze.common.event.AnalyzeEventCache; import com.couplet.analyze.msg.domain.CoupletMsgData; import com.couplet.analyze.msg.service.IncidentService; @@ -10,7 +11,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -26,7 +26,6 @@ import static com.couplet.analyze.msg.utils.MsgUtils.hexToString; import static com.couplet.analyze.msg.utils.MsgUtils.sendMsg; import static java.lang.Thread.sleep; - /** * @author DongXiaoDong * @version 1.0 @@ -36,7 +35,7 @@ import static java.lang.Thread.sleep; @Component @Slf4j public class ModelsKafkaMessage { - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "dong"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; @@ -89,7 +88,7 @@ public class ModelsKafkaMessage { while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { - System.out.println("接收到的数据:" + record.value()); + log.info("接收到的数据:" + record.value()); String str = hexToString(record.value()); List coupletMsgDataList = sendMsg(str); for (CoupletMsgData msgData : coupletMsgDataList) { diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index 1c4ab93..e506271 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -50,9 +50,3 @@ logging: mybatis-plus: configuration: map-underscore-to-camel-case: true - -# RabbitMQ配置 -mq: - queueName: queue - exchangeName: exchange - routingKey: routingKey diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index 6f4bfc9..68ac0dd 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "dong"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑 diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index 150ec78..d7abfa2 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -34,11 +34,11 @@ logging: # 订阅端配置 mqtt: server: - broker: tcp://115.159.47.13:1883 -# broker: mqtt://115.159.47.13:1883 +# broker: tcp://115.159.47.13:1883 + broker: tcp://8.130.181.16:1883 username: password: - clientId: mq + clientId: aaa qos: 0 topic: dxd From 60aa7650f0994f4b846edf2b7b8fd1ef9045e392 Mon Sep 17 00:00:00 2001 From: lijiayao <13831655+xiao-yao-charge-forward@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 11:34:04 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../couplet/analyze/msg/model/ModelsKafkaMessage.java | 6 +++--- .../msg/service/impl/RealTimeDataServiceImpl.java | 9 ++++----- .../couplet-analyze-msg/src/main/resources/bootstrap.yml | 5 +++++ couplet-modules/couplet-business/pom.xml | 1 + .../main/java/com/couplet/online/utils/MqttMonitor.java | 2 +- .../src/main/resources/bootstrap.yml | 1 - 6 files changed, 14 insertions(+), 10 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 0a40e9a..9536426 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -56,7 +57,7 @@ public class ModelsKafkaMessage { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "ddd"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -78,7 +79,6 @@ public class ModelsKafkaMessage { public void consumerMessages() { executorService.execute(this::consumer); } - public void consumer() { try { @@ -86,7 +86,7 @@ public class ModelsKafkaMessage { while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { - System.out.println("接收到的数据:" + record.value()); + log.info("接收到的数据:" + record.value()); String str = hexToString(record.value()); List coupletMsgDataList = sendMsg(str); for (CoupletMsgData msgData : coupletMsgDataList) { diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java index a3821dd..c50b6ef 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java @@ -42,8 +42,8 @@ public class RealTimeDataServiceImpl implements IncidentService { log.info("实时数据事件开始....."); RealTimeDataRequest cacheObject = redisService.getCacheObject("vin:" + coupletMsgData.getVin()); //判断是否有缓存数据 - if (redisService.hasKey("vin:query:" + cacheObject.getVin())){ - redisService.deleteObject("vin:query:" + cacheObject.getVin()); + if (redisService.hasKey("vin:query:" + coupletMsgData.getVin())){ + redisService.deleteObject("vin:query:" + coupletMsgData.getVin()); } // if (RealTimeJudge.isJudge(coupletMsgData.getVin())) { if (coupletMsgData.getVin().equals(cacheObject.getVin())){ @@ -51,9 +51,8 @@ public class RealTimeDataServiceImpl implements IncidentService { //判断数据是否一致, // if (RealTimeJudge.addRealTime(cacheObject)) { log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); - CoupletMsgData query = incidentMapper.queryByIncident(coupletMsgData.getVin()); - redisService.setCacheObject("vin:query:" + cacheObject.getVin(), query); - redisService.expire("vin:"+cacheObject.getVin(),10, TimeUnit.MINUTES); + redisService.setCacheObject("vin:query:" + coupletMsgData.getVin(), cacheObject); + redisService.expire("vin:"+coupletMsgData.getVin(),10, TimeUnit.MINUTES); // } else { // log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据不一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); // } diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml index a2cdc85..b2e9ca1 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/resources/bootstrap.yml @@ -43,6 +43,11 @@ spring: template: # 只要消息抵达Queue,就会异步发送优先回调return firm mandatory: true + kafka: + bootstrap-servers: 115.159.47.13:9092 + consumer: + group-id: group + auto-offset-reset: earliest logging: level: com.couplet.analyze.msg.mapper: DEBUG diff --git a/couplet-modules/couplet-business/pom.xml b/couplet-modules/couplet-business/pom.xml index cc83ffe..da1fe95 100644 --- a/couplet-modules/couplet-business/pom.xml +++ b/couplet-modules/couplet-business/pom.xml @@ -114,6 +114,7 @@ com.couplet couplet-analyze-msg + diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index feece3e..6f4bfc9 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "xiaoYao"; + private static final String TOPIC_NAME = "online"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑 diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index fc1f9a1..ddf4716 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -28,7 +28,6 @@ spring: logging: level: com.couplet.system.mapper: DEBUG - # 订阅端配置 mqtt: server: From 38521412a6f90f4a1198d49e68e4d72e81116d1a Mon Sep 17 00:00:00 2001 From: lijiayao <13831655+xiao-yao-charge-forward@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 11:51:54 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../couplet-analyze-msg/pom.xml | 18 ------------------ .../analyze/msg/model/ModelsKafkaMessage.java | 2 +- pom.xml | 14 ++++++++++++++ 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml index be2c65b..16ac173 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/pom.xml @@ -80,25 +80,12 @@ com.couplet couplet-common-swagger - - - - - - - com.couplet couplet-common-event - - - org.springframework.kafka - spring-kafka - --> - org.apache.kafka @@ -111,11 +98,6 @@ couplet-common-business - - - - - diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 4c82aa5..69b9d6d 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -11,7 +11,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.annotation.KafkaListener; + import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; diff --git a/pom.xml b/pom.xml index 139c8c9..48aa0c2 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,8 @@ 0.9.1 8.2.2 4.1.2 + 2.8.0 + 3.0.8 2.14.3 @@ -242,7 +244,19 @@ couplet-common-event ${couplet.version} + + + org.apache.kafka + kafka-clients + ${Kafka.version} + + + org.apache.dubbo + dubbo + ${dubbo.version} + compile + From af273c12cb71fc0670c7935f22f194b002808731 Mon Sep 17 00:00:00 2001 From: lijiayao <13831655+xiao-yao-charge-forward@user.noreply.gitee.com> Date: Tue, 9 Apr 2024 15:08:32 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../couplet/analyze/msg/model/ModelsKafkaMessage.java | 2 +- .../msg/service/impl/RealTimeDataServiceImpl.java | 10 +++++----- .../java/com/couplet/online/utils/MqttMonitor.java | 2 +- .../src/main/resources/bootstrap.yml | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java index 69b9d6d..a4c7ba4 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/model/ModelsKafkaMessage.java @@ -38,7 +38,7 @@ import static java.lang.Thread.sleep; @Component @Slf4j public class ModelsKafkaMessage { - private static final String TOPIC_NAME = "online"; + private static final String TOPIC_NAME = "ljy"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; diff --git a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java index c50b6ef..552a5bd 100644 --- a/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java +++ b/couplet-modules/couplet-analyze/couplet-analyze-msg/src/main/java/com/couplet/analyze/msg/service/impl/RealTimeDataServiceImpl.java @@ -41,17 +41,17 @@ public class RealTimeDataServiceImpl implements IncidentService { public void incident(CoupletMsgData coupletMsgData) { log.info("实时数据事件开始....."); RealTimeDataRequest cacheObject = redisService.getCacheObject("vin:" + coupletMsgData.getVin()); - //判断是否有缓存数据 - if (redisService.hasKey("vin:query:" + coupletMsgData.getVin())){ - redisService.deleteObject("vin:query:" + coupletMsgData.getVin()); - } +// //判断是否有缓存数据 +// if (redisService.hasKey("vin:query:" + coupletMsgData.getVin())){ +// redisService.deleteObject("vin:query:" + coupletMsgData.getVin()); +// } // if (RealTimeJudge.isJudge(coupletMsgData.getVin())) { if (coupletMsgData.getVin().equals(cacheObject.getVin())){ // log.info("有实时数据,值为:[{}]开始传输实时数据", coupletMsgData.getVin()); //判断数据是否一致, // if (RealTimeJudge.addRealTime(cacheObject)) { log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); - redisService.setCacheObject("vin:query:" + coupletMsgData.getVin(), cacheObject); + redisService.setCacheSet("vin:query:" + coupletMsgData.getVin(), coupletMsgData); redisService.expire("vin:"+coupletMsgData.getVin(),10, TimeUnit.MINUTES); // } else { // log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据不一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject); diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index 68ac0dd..d6a22d3 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -76,7 +76,7 @@ public class MqttMonitor { //Kafka生产者配置 - private static final String TOPIC_NAME = "dong"; + private static final String TOPIC_NAME = "ljy"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; //线程池,用于异步处理消息到来时的业务逻辑 diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index fc1f9a1..c7875b9 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -36,7 +36,7 @@ mqtt: # broker: mqtt://115.159.47.13:1883 username: password: - clientId: xiaoYao + clientId: aaaaaaa qos: 0 topic: xiaoYao