合并提交
server_2024_4_2_yuanyonghao
玉安君 2024-04-08 15:50:37 +08:00
parent 82cad8d974
commit 4664426562
2 changed files with 77 additions and 39 deletions

View File

@ -98,6 +98,12 @@
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.zhilian</groupId>
<artifactId>zhilian-common-business</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -14,9 +14,12 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @BelongsProject: smart-cloud-server
@ -74,7 +77,7 @@ public class MqttxConfig {
/**
* kafkatopic
*/
private String topicName = "test-topic";
private String topicName = "vehicle-topic";
/**
* kafka
@ -85,35 +88,15 @@ public class MqttxConfig {
/**
* 线
*/
private ExecutorService executorService = Executors.newFixedThreadPool(3);
private ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* kafka
*/
private KafkaProducer<String, String> kafkaProducer;
/**
* @return
* @Description: kafka
*/
@PostConstruct
public KafkaProducer<Object, Object> initKafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(properties);
}
/**
* @param msg
* @Description:
*/
private void sendMsg(String msg) {
kafkaProducer.send(new ProducerRecord<>(topicName, msg));
log.info("Kafka在topic:{}中发送消息{}", topicName, msg);
}
@PostConstruct
public void initMqtt() {
@ -128,7 +111,7 @@ public class MqttxConfig {
mqttConnectOptions.setPassword(password.toCharArray());
//链接超时
mqttConnectOptions.setConnectionTimeout(60);
mqttConnectOptions.setConnectionTimeout(10);
//心跳检测
mqttConnectOptions.setKeepAliveInterval(60);
mqttClient.connect(mqttConnectOptions);
@ -143,10 +126,10 @@ public class MqttxConfig {
while (mqttClient.isConnected()) {
try {
//每次重连间隔60秒
Thread.sleep(1000 * 60);
Thread.sleep(1000 * 5);
mqttClient.connect();
} catch (Exception e) {
throw new RuntimeException(e);
log.error("重连失败:" + e.getMessage());
}
}
}
@ -157,21 +140,9 @@ public class MqttxConfig {
//将接受到的车辆报文存储到kafka中
executorService.execute(() -> {
//解析得出原始报文String
String sourceMsg = new String(mqttMessage.getPayload());
//对原始报文进行解析
String parseMsg = MessageResolver.parseMsg(sourceMsg);
//截取原始报文获得车辆VIN码
String vin = parseMsg.substring(0, 17);
log.info("当前车辆VIN码:{}",vin);
//调取接口,查询是否是我们的车
executorTask(mqttMessage);
});
// executorTask(mqttMessage);
}
@ -191,4 +162,65 @@ public class MqttxConfig {
}
public void executorTask(MqttMessage mqttMessage){
//解析得出原始报文String
String sourceMsg = new String(mqttMessage.getPayload());
//对原始报文进行解析
String parseMsg = MessageResolver.parseMsg(sourceMsg);
//截取原始报文获得车辆VIN码
String vin = parseMsg.substring(0, 17);
log.info("当前车辆VIN码:{}",vin);
//查询该vin码是否为我们的车
List<Object> cacheList = redisService.getCacheList("our-car");
List<String> carList = cacheList.stream().map(item -> {
return String.valueOf(item);
}).collect(Collectors.toList());
if (carList.contains(vin)){
//将车辆vin码存入redis中,用于判断车辆是否处于上线状态
redisService.setCacheObject("online-vehicle",vin,60L, TimeUnit.SECONDS);
//发送rabbitMQ信息修改车辆上线状态
String string = mqttMessage.toString();
//将车辆报文发送到kafka
sendMsg(new String(mqttMessage.getPayload()));
}else {
//不是我们的车
log.info("车辆{}不是我们的车",vin);
}
}
/**
* @return
* @Description: kafka
*/
@PostConstruct
public KafkaProducer<Object, Object> initKafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
this.kafkaProducer = new KafkaProducer<>(properties);
return new KafkaProducer<>(properties);
}
/**
* @param msg
* @Description:
*/
private void sendMsg(String msg) {
kafkaProducer.send(new ProducerRecord<>(topicName, msg));
log.info("Kafka在topic:{}中发送消息{}", topicName, msg);
}
}