feat():解析模块远调注入使用

dev.analysis
LQS 2024-10-08 16:51:31 +08:00
parent ec41019824
commit ff31f5abc9
1 changed files with 2 additions and 3 deletions

View File

@ -15,7 +15,6 @@ import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -59,7 +58,7 @@ public class ParsingMQTT {
MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID); MqttClient sampleClient = new MqttClient(MQTTConnect.BROKER, MQTTConnect.CLIENT_ID);
MqttConnectOptions connOpts = new MqttConnectOptions(); MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true); connOpts.setCleanSession(true);
log.info("Connecting to MQTTConnect.BROKER: {}", MQTTConnect.BROKER); log.info("连接中MQTTConnect.BROKER: {}", MQTTConnect.BROKER);
sampleClient.connect(connOpts); sampleClient.connect(connOpts);
sampleClient.subscribe(MQTTConnect.TOPIC, 0); sampleClient.subscribe(MQTTConnect.TOPIC, 0);
sampleClient.setCallback(new MqttCallback() { sampleClient.setCallback(new MqttCallback() {
@ -78,7 +77,7 @@ public class ParsingMQTT {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING, ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.MESSAGE_PARSING,
entries.toString() ); entries.toString() );
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
log.info("解析之后的数据"+entries); log.info("解析之后的数据"+entries);
} }