feat:() 新增缓存和修改上线下线监听
parent
eade0c66ea
commit
2430d10401
|
@ -6,7 +6,12 @@ import com.alibaba.fastjson.JSON;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
||||||
import com.muyu.common.core.constant.KafkaConstants;
|
import com.muyu.common.core.constant.KafkaConstants;
|
||||||
|
import com.muyu.domain.Fence;
|
||||||
|
import com.muyu.domain.Vehicle;
|
||||||
|
import com.muyu.domain.WarnRule;
|
||||||
|
import com.muyu.domain.WarnStrategy;
|
||||||
import com.muyu.processing.interfaces.EventInterface;
|
import com.muyu.processing.interfaces.EventInterface;
|
||||||
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
@ -17,6 +22,7 @@ import org.springframework.stereotype.Component;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* kafka消费者
|
* kafka消费者
|
||||||
|
@ -33,6 +39,9 @@ public class KafkaConsumerService implements InitializingBean {
|
||||||
@Resource
|
@Resource
|
||||||
private KafkaConsumer kafkaConsumer;
|
private KafkaConsumer kafkaConsumer;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CacheUtil cacheUtil;
|
||||||
|
|
||||||
// @Resource
|
// @Resource
|
||||||
// private EventInterface eventInterface;
|
// private EventInterface eventInterface;
|
||||||
|
|
||||||
|
@ -54,8 +63,16 @@ public class KafkaConsumerService implements InitializingBean {
|
||||||
JSONObject jsonObject = JSON.parseObject(originalMsg);
|
JSONObject jsonObject = JSON.parseObject(originalMsg);
|
||||||
log.info("消费数据转换为JSON对象: " + jsonObject);
|
log.info("消费数据转换为JSON对象: " + jsonObject);
|
||||||
log.info("消费数据转换为JSON对象: " + jsonObject.toString());
|
log.info("消费数据转换为JSON对象: " + jsonObject.toString());
|
||||||
// eventInterface.handle(jsonObject);
|
|
||||||
|
|
||||||
|
String value = jsonObject.toString();
|
||||||
|
String vin = value.substring(0, 11);
|
||||||
|
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
|
||||||
|
WarnRule warnRule = (WarnRule) map.get("warnRule");
|
||||||
|
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
|
||||||
|
Vehicle vehicle = (Vehicle) map.get("vehicle");
|
||||||
|
Object breakdown = map.get("breakdown");
|
||||||
|
Fence fence = (Fence) map.get("fence");
|
||||||
|
// eventInterface.handle(jsonObject);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
package com.muyu.processing.consumer;
|
package com.muyu.processing.consumer;
|
||||||
|
|
||||||
|
import com.muyu.enterprise.cache.FaultCacheService;
|
||||||
|
import com.muyu.enterprise.cache.FenceCahceService;
|
||||||
|
import com.muyu.enterprise.cache.VehicleCacheService;
|
||||||
|
import com.muyu.enterprise.cache.WarnRuleCacheService;
|
||||||
import com.muyu.processing.utils.CacheUtil;
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
@ -23,6 +27,10 @@ public class OfflineMonitoringConsumer {
|
||||||
@Resource
|
@Resource
|
||||||
private CacheUtil cacheUtil;
|
private CacheUtil cacheUtil;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 接收消息
|
* 接收消息
|
||||||
* @param vin 车辆vin
|
* @param vin 车辆vin
|
||||||
|
|
|
@ -1,5 +1,10 @@
|
||||||
package com.muyu.processing.consumer;
|
package com.muyu.processing.consumer;
|
||||||
|
|
||||||
|
import com.muyu.domain.Fence;
|
||||||
|
import com.muyu.domain.Vehicle;
|
||||||
|
import com.muyu.domain.WarnRule;
|
||||||
|
import com.muyu.domain.WarnStrategy;
|
||||||
|
import com.muyu.enterprise.cache.*;
|
||||||
import com.muyu.processing.utils.CacheUtil;
|
import com.muyu.processing.utils.CacheUtil;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
@ -9,6 +14,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 上线监听
|
* 上线监听
|
||||||
|
@ -25,6 +31,21 @@ public class OnLineMonitoringConsumer {
|
||||||
@Resource
|
@Resource
|
||||||
private CacheUtil cacheUtil;
|
private CacheUtil cacheUtil;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private VehicleCacheService vehicleCacheService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private FaultCacheService faultCacheService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private FenceCahceService fenceCahceService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private WarnRuleCacheService warnRuleCacheService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private WarnStrategyCacheService warnStrategyCacheService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 上线监听车辆网关中车辆上线时
|
* 上线监听车辆网关中车辆上线时
|
||||||
*/
|
*/
|
||||||
|
@ -33,7 +54,18 @@ public class OnLineMonitoringConsumer {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
log.info("添加本地缓存,车辆vin: {}", vin);
|
log.info("添加本地缓存,车辆vin: {}", vin);
|
||||||
|
WarnRule warnRule = warnRuleCacheService.get(vin);
|
||||||
|
WarnStrategy warnStrategy = warnStrategyCacheService.get(vin);
|
||||||
|
Vehicle vehicle = vehicleCacheService.get(vin);
|
||||||
|
Object breakdown = faultCacheService.get(vin);
|
||||||
|
Fence fence = fenceCahceService.get(vin);
|
||||||
|
HashMap<String, Object> map = new HashMap<>();
|
||||||
|
map.put("warnRule",warnRule);
|
||||||
|
map.put("warnStrategy",warnStrategy);
|
||||||
|
map.put("vehicle",vehicle);
|
||||||
|
map.put("breakdown",breakdown);
|
||||||
|
map.put("fence",fence);
|
||||||
|
cacheUtil.put(vin,map);
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package com.muyu.processing.interfaces;
|
package com.muyu.processing.interfaces;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import org.checkerframework.checker.units.qual.C;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 事件处理接口
|
* 事件处理接口
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
package com.muyu.processing.utils;
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 缓存工具类
|
||||||
|
* @Author:杨鹏
|
||||||
|
* @Package:com.muyu.processing.utils
|
||||||
|
* @Project:cloud-vehicle
|
||||||
|
* @name:CacheUtil
|
||||||
|
* @Date:2024/10/4 15:14
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class CacheUtil<T> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 缓存对象
|
||||||
|
*/
|
||||||
|
private final Cache<String, T> cache;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 默认构建函数
|
||||||
|
*/
|
||||||
|
public CacheUtil(){
|
||||||
|
this.cache = Caffeine.newBuilder()
|
||||||
|
.maximumSize(500L)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获得缓存
|
||||||
|
* @param key 键
|
||||||
|
* @return 返回的值
|
||||||
|
*/
|
||||||
|
public T get(String key){
|
||||||
|
return cache.getIfPresent(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加缓存
|
||||||
|
* @param key 键
|
||||||
|
* @param value 值
|
||||||
|
*/
|
||||||
|
public void put(String key, T value){
|
||||||
|
cache.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除缓存
|
||||||
|
* @param key 键
|
||||||
|
*/
|
||||||
|
public void remove(String key){
|
||||||
|
cache.invalidate(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue