diff --git a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java index f72fd30..058c21d 100644 --- a/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java +++ b/cloud-modules/cloud-modules-protocol-parsing/src/main/java/com/muyu/cloud/protocol/parsing/ParsingMessage.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -34,11 +35,14 @@ public class ParsingMessage { @Resource private KafkaProducer kafkaProducer; // 车辆缓存服务 - private final VehicleCacheService vehicleCacheService; + @Autowired + private VehicleCacheService vehicleCacheService; // 车辆类型缓存服务 - private final VehicleTypeCacheService vehicleTypeCacheService; + @Autowired + private VehicleTypeCacheService vehicleTypeCacheService; // 报文模版缓存服务 - private final AllMessageValueCacheService allMessageValueCacheService; + @Autowired + private AllMessageValueCacheService allMessageValueCacheService; // MQTT主题 private static final String TOPIC = "vehicle"; @@ -48,18 +52,8 @@ public class ParsingMessage { private static final String CLIENT_ID = "JavaSample"; // MQTT客户端 private MqttClient mqttClient; - - /** - * 构造方法, 通过构造器注入依赖的缓存服务 - * @param vehicleCacheService 车辆缓存服务 - * @param vehicleTypeCacheService 车辆类型缓存服务 - * @param allMessageValueCacheService 报文模版缓存服务 - */ - public ParsingMessage(VehicleCacheService vehicleCacheService, VehicleTypeCacheService vehicleTypeCacheService, AllMessageValueCacheService allMessageValueCacheService) { - this.vehicleCacheService = vehicleCacheService; - this.vehicleTypeCacheService = vehicleTypeCacheService; - this.allMessageValueCacheService = allMessageValueCacheService; - } + // kafka topic + private static final String TIPSY = "tipsy"; /** * 初始化MQTT连接 @@ -184,7 +178,7 @@ public class ParsingMessage { * @param parseMessage */ private void sendKafkaMessage(JSONObject parseMessage) { - ProducerRecord producerRecord = new ProducerRecord<>("zeshi", parseMessage.toString()); + ProducerRecord producerRecord = new ProducerRecord<>(TIPSY, parseMessage.toString()); kafkaProducer.send(producerRecord); log.info("发送Kafka消息: " + parseMessage); }