From e2e943749ea4348a0d5a3f50fca62be8f836d7e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=93=80?= <2076029107@qq.com> Date: Mon, 7 Oct 2024 21:41:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:()=20=E4=BC=98=E5=8C=96=E4=B8=8B=E7=BA=BF?= =?UTF-8?q?=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/KafkaConsumerService.java | 3 -- .../consumer/OfflineMonitoringConsumer.java | 30 +++++++++++-------- .../muyu/processing/controller/TestKafka.java | 9 +++--- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java index f26aeb7..56a2ec5 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/KafkaConsumerService.java @@ -1,17 +1,14 @@ package com.muyu.processing.consumer; import cn.hutool.core.thread.ThreadUtil; -import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; -import com.muyu.common.core.constant.KafkaConstants; import com.muyu.domain.Fence; import com.muyu.domain.Vehicle; import com.muyu.domain.WarnRule; import com.muyu.domain.WarnStrategy; import com.muyu.domain.resp.VehicleManageResp; -import com.muyu.processing.interfaces.EventInterface; import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java index d3f6232..c5f6f48 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OfflineMonitoringConsumer.java @@ -1,11 +1,10 @@ package com.muyu.processing.consumer; -import com.muyu.enterprise.cache.FaultCacheService; -import com.muyu.enterprise.cache.FenceCahceService; -import com.muyu.enterprise.cache.VehicleCacheService; -import com.muyu.enterprise.cache.WarnRuleCacheService; + import com.muyu.processing.utils.CacheUtil; import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.Message; +import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @@ -27,19 +26,26 @@ public class OfflineMonitoringConsumer { @Resource private CacheUtil cacheUtil; - - - - /** * 接收消息 * @param vin 车辆vin */ @RabbitListener(queuesToDeclare = @Queue("offline_monitoring")) - public void receive(String vin){ - log.info("清除缓存中的数据,车辆vin: {}", vin); - // 清除缓存 - cacheUtil.remove(vin); + public void receive(String vin, Message message, Channel channel){ + try { + log.info("清除缓存中的数据,车辆vin: {}", vin); + // 清除缓存 + cacheUtil.remove(vin); + log.info("vin码为: {}, 的本地缓存清除成功",vin); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (Exception e) { + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + } catch (Exception ex) { + log.info("清除本地缓存异常为: {}",e.getMessage()); + } + } + } } diff --git a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java index 7087aaa..32bdd37 100644 --- a/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java +++ b/cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/controller/TestKafka.java @@ -1,13 +1,10 @@ package com.muyu.processing.controller; import cn.hutool.json.JSONObject; -import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.utils.uuid.UUID; -import com.muyu.common.kafka.config.KafkaProducerConfig; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.protocol.types.Field; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -69,7 +66,11 @@ public class TestKafka { */ @GetMapping("/sendDui") public String sedDui() { - rabbitTemplate.convertAndSend("myExchange","Im.fine",""); + rabbitTemplate.convertAndSend("offline_monitoring","1123wsdfr54323wsd",message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); + return message; + }); +// rabbitTemplate.convertAndSend("myExchange","Im.fine",""); return "OK"; } }