From 4664426562f4be5ac3a60f4c1261c2a4fce4119e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=89=E5=AE=89=E5=90=9B?= <2746727141@qq.com> Date: Mon, 8 Apr 2024 15:50:37 +0800 Subject: [PATCH] =?UTF-8?q?commit=20=E5=90=88=E5=B9=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zhilian-modules/zhilian-online/pom.xml | 6 + .../zhilian/online/config/MqttxConfig.java | 110 +++++++++++------- 2 files changed, 77 insertions(+), 39 deletions(-) diff --git a/zhilian-modules/zhilian-online/pom.xml b/zhilian-modules/zhilian-online/pom.xml index 55aab47..95c6683 100644 --- a/zhilian-modules/zhilian-online/pom.xml +++ b/zhilian-modules/zhilian-online/pom.xml @@ -98,6 +98,12 @@ spring-kafka + + + com.zhilian + zhilian-common-business + + diff --git a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java index 4fb748f..4581512 100644 --- a/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java +++ b/zhilian-modules/zhilian-online/src/main/java/com/zhilian/online/config/MqttxConfig.java @@ -14,9 +14,12 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @BelongsProject: smart-cloud-server @@ -74,7 +77,7 @@ public class MqttxConfig { /** * kafka订阅topic */ - private String topicName = "test-topic"; + private String topicName = "vehicle-topic"; /** * kafka服务 @@ -85,35 +88,15 @@ public class MqttxConfig { /** * 线程池 */ - private ExecutorService executorService = Executors.newFixedThreadPool(3); + private ExecutorService executorService = Executors.newSingleThreadExecutor(); /** * kafka服务 */ private KafkaProducer kafkaProducer; - /** - * @return - * @Description: 初始化kafka服务 - */ - @PostConstruct - public KafkaProducer initKafkaProducer() { - Properties properties = new Properties(); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - return new KafkaProducer<>(properties); - } - /** - * @param msg - * @Description: 发送消息 - */ - private void sendMsg(String msg) { - kafkaProducer.send(new ProducerRecord<>(topicName, msg)); - log.info("Kafka在topic:{}中发送消息{}", topicName, msg); - } @PostConstruct public void initMqtt() { @@ -128,7 +111,7 @@ public class MqttxConfig { mqttConnectOptions.setPassword(password.toCharArray()); //链接超时 - mqttConnectOptions.setConnectionTimeout(60); + mqttConnectOptions.setConnectionTimeout(10); //心跳检测 mqttConnectOptions.setKeepAliveInterval(60); mqttClient.connect(mqttConnectOptions); @@ -143,10 +126,10 @@ public class MqttxConfig { while (mqttClient.isConnected()) { try { //每次重连间隔60秒 - Thread.sleep(1000 * 60); + Thread.sleep(1000 * 5); mqttClient.connect(); } catch (Exception e) { - throw new RuntimeException(e); + log.error("重连失败:" + e.getMessage()); } } } @@ -157,21 +140,9 @@ public class MqttxConfig { //将接受到的车辆报文存储到kafka中 executorService.execute(() -> { - //解析得出原始报文String - String sourceMsg = new String(mqttMessage.getPayload()); - //对原始报文进行解析 - String parseMsg = MessageResolver.parseMsg(sourceMsg); - - //截取原始报文获得车辆VIN码 - String vin = parseMsg.substring(0, 17); - - log.info("当前车辆VIN码:{}",vin); - - //调取接口,查询是否是我们的车 - - - + executorTask(mqttMessage); }); +// executorTask(mqttMessage); } @@ -191,4 +162,65 @@ public class MqttxConfig { } + + + + public void executorTask(MqttMessage mqttMessage){ + //解析得出原始报文String + String sourceMsg = new String(mqttMessage.getPayload()); + //对原始报文进行解析 + String parseMsg = MessageResolver.parseMsg(sourceMsg); + + //截取原始报文获得车辆VIN码 + String vin = parseMsg.substring(0, 17); + + log.info("当前车辆VIN码:{}",vin); + + + //查询该vin码是否为我们的车 + List cacheList = redisService.getCacheList("our-car"); + List carList = cacheList.stream().map(item -> { + return String.valueOf(item); + }).collect(Collectors.toList()); + if (carList.contains(vin)){ + //将车辆vin码存入redis中,用于判断车辆是否处于上线状态 + redisService.setCacheObject("online-vehicle",vin,60L, TimeUnit.SECONDS); + + //发送rabbitMQ信息修改车辆上线状态 + + String string = mqttMessage.toString(); + //将车辆报文发送到kafka + sendMsg(new String(mqttMessage.getPayload())); + + + }else { + //不是我们的车 + log.info("车辆{}不是我们的车",vin); + } + } + + /** + * @return + * @Description: 初始化kafka服务 + */ + @PostConstruct + public KafkaProducer initKafkaProducer() { + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + this.kafkaProducer = new KafkaProducer<>(properties); + return new KafkaProducer<>(properties); + } + + /** + * @param msg + * @Description: 发送消息 + */ + private void sendMsg(String msg) { + kafkaProducer.send(new ProducerRecord<>(topicName, msg)); + log.info("Kafka在topic:{}中发送消息{}", topicName, msg); + } + + }