feat:() 优化下线监听
parent
013fe5809d
commit
e2e943749e
|
@ -1,17 +1,14 @@
|
||||||
package com.muyu.processing.consumer;
|
package com.muyu.processing.consumer;
|
||||||
|
|
||||||
import cn.hutool.core.thread.ThreadUtil;
|
import cn.hutool.core.thread.ThreadUtil;
|
||||||
import cn.hutool.json.JSONUtil;
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
||||||
import com.muyu.common.core.constant.KafkaConstants;
|
|
||||||
import com.muyu.domain.Fence;
|
import com.muyu.domain.Fence;
|
||||||
import com.muyu.domain.Vehicle;
|
import com.muyu.domain.Vehicle;
|
||||||
import com.muyu.domain.WarnRule;
|
import com.muyu.domain.WarnRule;
|
||||||
import com.muyu.domain.WarnStrategy;
|
import com.muyu.domain.WarnStrategy;
|
||||||
import com.muyu.domain.resp.VehicleManageResp;
|
import com.muyu.domain.resp.VehicleManageResp;
|
||||||
import com.muyu.processing.interfaces.EventInterface;
|
|
||||||
import com.muyu.processing.utils.CacheUtil;
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
|
|
@ -1,11 +1,10 @@
|
||||||
package com.muyu.processing.consumer;
|
package com.muyu.processing.consumer;
|
||||||
|
|
||||||
import com.muyu.enterprise.cache.FaultCacheService;
|
|
||||||
import com.muyu.enterprise.cache.FenceCahceService;
|
|
||||||
import com.muyu.enterprise.cache.VehicleCacheService;
|
|
||||||
import com.muyu.enterprise.cache.WarnRuleCacheService;
|
|
||||||
import com.muyu.processing.utils.CacheUtil;
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
@ -27,19 +26,26 @@ public class OfflineMonitoringConsumer {
|
||||||
@Resource
|
@Resource
|
||||||
private CacheUtil cacheUtil;
|
private CacheUtil cacheUtil;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 接收消息
|
* 接收消息
|
||||||
* @param vin 车辆vin
|
* @param vin 车辆vin
|
||||||
*/
|
*/
|
||||||
@RabbitListener(queuesToDeclare = @Queue("offline_monitoring"))
|
@RabbitListener(queuesToDeclare = @Queue("offline_monitoring"))
|
||||||
public void receive(String vin){
|
public void receive(String vin, Message message, Channel channel){
|
||||||
log.info("清除缓存中的数据,车辆vin: {}", vin);
|
try {
|
||||||
// 清除缓存
|
log.info("清除缓存中的数据,车辆vin: {}", vin);
|
||||||
cacheUtil.remove(vin);
|
// 清除缓存
|
||||||
|
cacheUtil.remove(vin);
|
||||||
|
log.info("vin码为: {}, 的本地缓存清除成功",vin);
|
||||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
try {
|
||||||
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
log.info("清除本地缓存异常为: {}",e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,10 @@
|
||||||
package com.muyu.processing.controller;
|
package com.muyu.processing.controller;
|
||||||
|
|
||||||
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONObject;
|
||||||
import com.muyu.common.core.constant.KafkaConstants;
|
|
||||||
import com.muyu.common.core.utils.uuid.UUID;
|
import com.muyu.common.core.utils.uuid.UUID;
|
||||||
import com.muyu.common.kafka.config.KafkaProducerConfig;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.kafka.common.protocol.types.Field;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
@ -69,7 +66,11 @@ public class TestKafka {
|
||||||
*/
|
*/
|
||||||
@GetMapping("/sendDui")
|
@GetMapping("/sendDui")
|
||||||
public String sedDui() {
|
public String sedDui() {
|
||||||
rabbitTemplate.convertAndSend("myExchange","Im.fine","");
|
rabbitTemplate.convertAndSend("offline_monitoring","1123wsdfr54323wsd",message -> {
|
||||||
|
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||||
|
return message;
|
||||||
|
});
|
||||||
|
// rabbitTemplate.convertAndSend("myExchange","Im.fine","");
|
||||||
return "OK";
|
return "OK";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue