feat(): 更新注入方法
parent
820039d3fe
commit
c8c16b9c80
|
@ -12,6 +12,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
|
@ -34,11 +35,14 @@ public class ParsingMessage {
|
||||||
@Resource
|
@Resource
|
||||||
private KafkaProducer<String, String> kafkaProducer;
|
private KafkaProducer<String, String> kafkaProducer;
|
||||||
// 车辆缓存服务
|
// 车辆缓存服务
|
||||||
private final VehicleCacheService vehicleCacheService;
|
@Autowired
|
||||||
|
private VehicleCacheService vehicleCacheService;
|
||||||
// 车辆类型缓存服务
|
// 车辆类型缓存服务
|
||||||
private final VehicleTypeCacheService vehicleTypeCacheService;
|
@Autowired
|
||||||
|
private VehicleTypeCacheService vehicleTypeCacheService;
|
||||||
// 报文模版缓存服务
|
// 报文模版缓存服务
|
||||||
private final AllMessageValueCacheService allMessageValueCacheService;
|
@Autowired
|
||||||
|
private AllMessageValueCacheService allMessageValueCacheService;
|
||||||
|
|
||||||
// MQTT主题
|
// MQTT主题
|
||||||
private static final String TOPIC = "vehicle";
|
private static final String TOPIC = "vehicle";
|
||||||
|
@ -48,18 +52,8 @@ public class ParsingMessage {
|
||||||
private static final String CLIENT_ID = "JavaSample";
|
private static final String CLIENT_ID = "JavaSample";
|
||||||
// MQTT客户端
|
// MQTT客户端
|
||||||
private MqttClient mqttClient;
|
private MqttClient mqttClient;
|
||||||
|
// kafka topic
|
||||||
/**
|
private static final String TIPSY = "tipsy";
|
||||||
* 构造方法, 通过构造器注入依赖的缓存服务
|
|
||||||
* @param vehicleCacheService 车辆缓存服务
|
|
||||||
* @param vehicleTypeCacheService 车辆类型缓存服务
|
|
||||||
* @param allMessageValueCacheService 报文模版缓存服务
|
|
||||||
*/
|
|
||||||
public ParsingMessage(VehicleCacheService vehicleCacheService, VehicleTypeCacheService vehicleTypeCacheService, AllMessageValueCacheService allMessageValueCacheService) {
|
|
||||||
this.vehicleCacheService = vehicleCacheService;
|
|
||||||
this.vehicleTypeCacheService = vehicleTypeCacheService;
|
|
||||||
this.allMessageValueCacheService = allMessageValueCacheService;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 初始化MQTT连接
|
* 初始化MQTT连接
|
||||||
|
@ -184,7 +178,7 @@ public class ParsingMessage {
|
||||||
* @param parseMessage
|
* @param parseMessage
|
||||||
*/
|
*/
|
||||||
private void sendKafkaMessage(JSONObject parseMessage) {
|
private void sendKafkaMessage(JSONObject parseMessage) {
|
||||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", parseMessage.toString());
|
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TIPSY, parseMessage.toString());
|
||||||
kafkaProducer.send(producerRecord);
|
kafkaProducer.send(producerRecord);
|
||||||
log.info("发送Kafka消息: " + parseMessage);
|
log.info("发送Kafka消息: " + parseMessage);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue