From 540b92c57787e1438fb0eaf90cced4f1af7a6d55 Mon Sep 17 00:00:00 2001 From: DongZeLiang <2746733890@qq.com> Date: Fri, 11 Oct 2024 11:12:25 +0800 Subject: [PATCH] =?UTF-8?q?fix():=20=E4=BF=AE=E5=A4=8Dbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/KafkaConsumerService.java | 2 +- .../parsing/manager/MessageProcessor.java | 2 +- .../mqtt/service/MqttClientService.java | 24 +++++++++++++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java index ff1eaf5..e38fcf4 100644 --- a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java @@ -47,7 +47,7 @@ public class KafkaConsumerService implements InitializingBean { while (true) { try { ThreadUtil.sleep(1000); - System.out.println("开始消费数据,等待中..."); + log.info("开始消费数据-[{}],等待中...",KafkaConstants.KafkaTopic); ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { //1.从ConsumerRecord中获取消费数据 diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/MessageProcessor.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/MessageProcessor.java index 99707c0..9cafbb0 100644 --- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/MessageProcessor.java +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/manager/MessageProcessor.java @@ -70,7 +70,7 @@ public class MessageProcessor { String jsonString = JSONObject.toJSONString(kafKaDataList); ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); kafkaProducer.send(producerRecord); - log.info("kafka投产:{}", jsonString); + log.info("kafka-{}投产:{}", KafkaConstants.KafkaTopic, kafKaDataList.stream().filter(kafKaData -> kafKaData.getKey().equals("VIN")).findFirst().get()); } } diff --git a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java index ff82b84..777b02c 100644 --- a/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java +++ b/cloud-modules/cloud-modules-parsing/src/main/java/com/muyu/parsing/mqtt/service/MqttClientService.java @@ -1,5 +1,6 @@ package com.muyu.parsing.mqtt.service; +import com.alibaba.fastjson2.JSONObject; import com.muyu.car.gateway.domain.properties.MqttProperties; import com.muyu.parsing.manager.MessageProcessor; import lombok.extern.slf4j.Slf4j; @@ -25,10 +26,25 @@ public class MqttClientService { public void connectAndSubscribeAsync(MqttProperties mqttProperties) { executorService.submit(() -> { - try { - connectAndSubscribe(mqttProperties); - } catch (MqttException | IOException e) { - log.error("MQTT连接或订阅失败", e); + int inct = 0; + while (true) { + try { + connectAndSubscribe(mqttProperties); + log.info("MQTT客户端连接成功:[{}]", JSONObject.toJSONString(mqttProperties)); + break; + } catch (MqttException | IOException e) { + if (inct > 5){ + log.error("MQTT连接或订阅失败-{},已经尝试{}次:{}",e.getMessage(),inct,JSONObject.toJSONString(mqttProperties)); + break; + } + + log.error("MQTT连接或订阅失败-{},五秒钟之后第{}次尝试", e.getMessage(), ++inct ,e); + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } } }); }