From 78f59d7392efe1c1ce82bc122e18c02d2064b021 Mon Sep 17 00:00:00 2001 From: liuyunhu <3286117488@qq.com> Date: Sat, 6 Apr 2024 16:30:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../couplet/business/server/time/Timer.java | 17 +- .../com/couplet/mq/controller/KafkaTest.java | 2 +- .../com/couplet/online/utils/MqttMonitor.java | 170 ++++++++++-------- .../src/main/resources/bootstrap.yml | 2 + 4 files changed, 103 insertions(+), 88 deletions(-) diff --git a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java index fb955d4..95c11bd 100644 --- a/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java +++ b/couplet-modules/couplet-business/src/main/java/com/couplet/business/server/time/Timer.java @@ -6,6 +6,7 @@ import com.couplet.common.domain.request.VehicleListParams; import com.couplet.common.redis.service.RedisService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -22,7 +23,7 @@ import java.util.List; public class Timer { //redis @Autowired - private RedisService redis; + private StringRedisTemplate redis; //查询车辆列表 @Autowired private VehicleService vehicleService; @@ -35,9 +36,7 @@ public class Timer { //先查询车辆列表 List list = vehicleService.list(new VehicleListParams(null, null, null, null)); - - list.forEach(vehicle -> { - + for (Vehicle vehicle : list) { //只针对已经上线的车辆 if (redis.hasKey(vehicle.getVin())) { @@ -48,15 +47,15 @@ public class Timer { //执行修改下线状态的方法 Integer i = vehicleService.onOrOutLineByVIN(vehicle.getVin(), 0); - if (0 == 1) { - log.error("下线状态修改失败"); - } +// if (0 == 1) { +// log.error("下线状态修改失败"); +// } log.info("下线状态修改成功"); } } + } - - }); } + } diff --git a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java index 3c90638..1bfc0ca 100644 --- a/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java +++ b/couplet-modules/couplet-modules-mq/src/main/java/com/couplet/mq/controller/KafkaTest.java @@ -26,7 +26,7 @@ public class KafkaTest { public static void main(String[] args) { //生产者示例 - produceMessage(); +// produceMessage(); //消费者示例 consumerMessages(); diff --git a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java index 56f860b..c0b72e0 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java +++ b/couplet-modules/couplet-modules-onLine/src/main/java/com/couplet/online/utils/MqttMonitor.java @@ -2,7 +2,6 @@ package com.couplet.online.utils; import com.couplet.common.domain.Vehicle; import com.couplet.remote.RemoteVehicleService; -import com.fasterxml.jackson.databind.ser.std.StringSerializer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -17,6 +16,8 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -78,6 +79,11 @@ public class MqttMonitor { private static final String TOPIC_NAME = "online"; private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092"; + //线程池,用于异步处理消息到来时的业务逻辑 + private ExecutorService executorService = Executors.newSingleThreadExecutor(); + + //Kafka生产者实例化为类成员变量 + private KafkaProducer kafkaProducer; //随项目启动而执行这个方法 @PostConstruct @@ -116,7 +122,7 @@ public class MqttMonitor { client.reconnect(); log.info("重连中..."); } catch (InterruptedException | MqttException e) { - throw new RuntimeException(e); + log.error("重连失败:" + e.getMessage()); } } @@ -124,72 +130,7 @@ public class MqttMonitor { @Override public void messageArrived(String topic, MqttMessage mqttMessage) { -// log.info("消息已送达"); -// log.info("接收消息主题:{}",topic); -// log.info("接收消息qos:{}", mqttMessage.getQos()); - - //接收到的原始报文 - String message = new String(mqttMessage.getPayload()); - -// log.info("接收消息原始内容:{}", message); - - //解析后的字符串 - String parseMsg = ParseMessageUtil.parseMsg(message); - - //拿到前17位(车辆vin码) - String start17 = parseMsg.substring(0, 17); - - - log.info("当前车辆的vin码为:" + start17); - -// //判断缓存中是否有这个vin -// if (redis.hasKey("不存在的车辆VIN" + start17)) { -// -// //可使用RabbitMQ发送消息 -// log.error("vin码为" + start17 + "的车辆不属于本系统!"); -// -// } else {//如果缓存中没有存这个vin -// -// -// } - - - //调取接口,通过vin查询车辆 - List vehicles = remoteVehicleService.findByVIN(start17).getData(); - System.out.println("**************" + vehicles); - - - //如果不存在这个车 - if (0 == vehicles.size()) { - //将不属于自己系统的车辆存入缓存,便于提前进行拒绝提示 -// redis.setCacheObject("不存在的车辆VIN" + start17, start17); - log.error("未找到vin码为" + start17 + "的车辆信息"); - } else { - //如果存在这个车 - Vehicle vehicle = vehicles.get(0); - log.info("远程调用查询到的车辆数据:" + vehicle); - - //上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询 - redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS); - - - log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!"); - - //调用上线接口,修改上线状态 - Integer i = remoteVehicleService.onOrOutLineByVIN(start17 + "," + 1); - //上线成功 - if (0 != i) { - log.info("上线成功!"); - try { - produceMessage(message); - } catch (Exception e) { - e.printStackTrace(); - } - } - - - } - + executorService.execute(() -> processMessageArrival(topic, mqttMessage)); } @Override @@ -203,31 +144,104 @@ public class MqttMonitor { } catch (MqttException e) { log.error("mqtt监听者启动失败,{}", e.getMessage()); - throw new RuntimeException(e); +// throw new RuntimeException(e); } } - //Kafka生产者 - private static void produceMessage(String message) { + //异步处理,mqtt消息到达后的逻辑 + private void processMessageArrival(String topic, MqttMessage mqttMessage) { + // log.info("消息已送达"); + // log.info("接收消息主题:{}",topic); + // log.info("接收消息qos:{}", mqttMessage.getQos()); + + //接收到的原始报文 + String message = new String(mqttMessage.getPayload()); + + //log.info("接收消息原始内容:{}", message); + + //解析后的字符串 + String parseMsg = ParseMessageUtil.parseMsg(message); + + //拿到前17位(车辆vin码) + String start17 = parseMsg.substring(0, 17); + + + log.info("当前车辆的vin码为:" + start17); + +// //判断缓存中是否有这个vin +// if (redis.hasKey("不存在的车辆VIN" + start17)) { +// +// //可使用RabbitMQ发送消息 +// log.error("vin码为" + start17 + "的车辆不属于本系统!"); +// +// } else {//如果缓存中没有存这个vin +// +// +// } + + + //调取接口,通过vin查询车辆 + List vehicles = remoteVehicleService.findByVIN(start17).getData(); + System.out.println("**************" + vehicles); + + + //如果不存在这个车 + if (0 == vehicles.size()) { + //将不属于自己系统的车辆存入缓存,便于提前进行拒绝提示 +// redis.setCacheObject("不存在的车辆VIN" + start17, start17); + log.error("未找到vin码为" + start17 + "的车辆信息"); + } else { + //如果存在这个车 + Vehicle vehicle = vehicles.get(0); + log.info("远程调用查询到的车辆数据:" + vehicle); + + //上线车辆存入redis 6秒 用于判断车辆是否下线,还要写定时器,定时查询 + redis.opsForValue().set(start17, start17, 6L, TimeUnit.SECONDS); + + + log.info("vin码为" + start17 + "的车辆属于本系统,允许上线!"); + + //调用上线接口,修改上线状态 + Integer i = remoteVehicleService.onOrOutLineByVIN(start17 + "," + 1); + //上线成功 + if (0 != i) { + log.info("上线成功!"); + try { + produceMessage(message); + } catch (Exception e) { + log.error("发送信息异常:" + e.getMessage()); + } + } + + + } + + } + + //Kafka生产者实例化为类成员变量 + @PostConstruct + public void initKafkaProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - KafkaProducer producer = new KafkaProducer<>(props); - //创建生产者 - try { + kafkaProducer = new KafkaProducer<>(props); + } + + //发送消息至Kafka + private void produceMessage(String message) { + + try { //发送消息 - producer.send(new ProducerRecord<>(TOPIC_NAME, message)); + kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, message)); System.out.println("发送消息:" + message); } catch (Exception e) { - e.printStackTrace(); - } finally { - producer.close(); + log.error("消息发送失败:" + e.getMessage()); } } } diff --git a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml index 1d9a0fa..9362179 100644 --- a/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml +++ b/couplet-modules/couplet-modules-onLine/src/main/resources/bootstrap.yml @@ -15,9 +15,11 @@ spring: discovery: # 服务注册地址 server-addr: 121.89.211.230:8848 + namespace: 172469 config: # 配置中心地址 server-addr: 121.89.211.230:8848 + namespace: 172469 # 配置文件格式 file-extension: yml # 共享配置