fix(): 修复bug

dev
DongZeLiang 2024-10-11 11:12:25 +08:00
parent 976ac2fb21
commit 540b92c577
3 changed files with 22 additions and 6 deletions

View File

@ -47,7 +47,7 @@ public class KafkaConsumerService implements InitializingBean {
while (true) { while (true) {
try { try {
ThreadUtil.sleep(1000); ThreadUtil.sleep(1000);
System.out.println("开始消费数据,等待中..."); log.info("开始消费数据-[{}],等待中...",KafkaConstants.KafkaTopic);
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord consumerRecord : consumerRecords) { for (ConsumerRecord consumerRecord : consumerRecords) {
//1.从ConsumerRecord中获取消费数据 //1.从ConsumerRecord中获取消费数据

View File

@ -70,7 +70,7 @@ public class MessageProcessor {
String jsonString = JSONObject.toJSONString(kafKaDataList); String jsonString = JSONObject.toJSONString(kafKaDataList);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
log.info("kafka投产:{}", jsonString); log.info("kafka-{}投产:{}", KafkaConstants.KafkaTopic, kafKaDataList.stream().filter(kafKaData -> kafKaData.getKey().equals("VIN")).findFirst().get());
} }
} }

View File

@ -1,5 +1,6 @@
package com.muyu.parsing.mqtt.service; package com.muyu.parsing.mqtt.service;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.car.gateway.domain.properties.MqttProperties; import com.muyu.car.gateway.domain.properties.MqttProperties;
import com.muyu.parsing.manager.MessageProcessor; import com.muyu.parsing.manager.MessageProcessor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -25,10 +26,25 @@ public class MqttClientService {
public void connectAndSubscribeAsync(MqttProperties mqttProperties) { public void connectAndSubscribeAsync(MqttProperties mqttProperties) {
executorService.submit(() -> { executorService.submit(() -> {
try { int inct = 0;
connectAndSubscribe(mqttProperties); while (true) {
} catch (MqttException | IOException e) { try {
log.error("MQTT连接或订阅失败", e); connectAndSubscribe(mqttProperties);
log.info("MQTT客户端连接成功[{}]", JSONObject.toJSONString(mqttProperties));
break;
} catch (MqttException | IOException e) {
if (inct > 5){
log.error("MQTT连接或订阅失败-{},已经尝试{}次:{}",e.getMessage(),inct,JSONObject.toJSONString(mqttProperties));
break;
}
log.error("MQTT连接或订阅失败-{},五秒钟之后第{}次尝试", e.getMessage(), ++inct ,e);
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
} }
}); });
} }