Compare commits

...

8 Commits

Author SHA1 Message Date
晨哀 61f1ea45b2 Merge remote-tracking branch 'origin/dev.processing.optimize' into dev
# Conflicts:
#	cloud-modules/cloud-modules-processing/src/main/java/com/muyu/processing/consumer/OnLineMonitoringConsumer.java
2024-10-08 12:05:36 +08:00
晨哀 ad0460a90b feat:() 测试数据库添加事件 2024-10-08 12:01:03 +08:00
晨哀 72ac72330e feat:() 更改kafka模式 2024-10-08 11:27:39 +08:00
晨哀 1e9c92e4d3 feat:() 优化上下线监听队列名称 和新增发送事件方法 2024-10-08 10:16:25 +08:00
晨哀 e2e943749e feat:() 优化下线监听 2024-10-07 21:41:41 +08:00
晨哀 013fe5809d feat:() 优化kafka消费者和上线监听 添加注解 2024-10-07 20:46:24 +08:00
晨哀 7f45b28116 feat:() 优化kafka消费者和上线监听 2024-10-07 20:37:45 +08:00
晨哀 d3f2444521 feat:() 优化kafka消费者 2024-10-07 20:00:24 +08:00
12 changed files with 177 additions and 237 deletions

View File

@ -20,6 +20,26 @@ public class VehicleProcessingApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(VehicleProcessingApplication.class, args); SpringApplication.run(VehicleProcessingApplication.class, args);
System.out.println(" _ooOoo_\n" +
" o8888888o\n" +
" 88\" . \"88\n" +
" (| -_- |)\n" +
" O\\ = /O\n" +
" ____/`---'\\____\n" +
" .' \\\\| |// `.\n" +
" / \\\\||| : |||// \\\n" +
" / _||||| -:- |||||- \\\n" +
" | | \\\\\\ - /// | |\n" +
" | \\_| ''\\---/'' | |\n" +
" \\ .-\\__ `-` ___/-. /\n" +
" ___`. .' /--.--\\ `. . __\n" +
" .\"\" '< `.___\\_<|>_/___.' >'\"\".\n" +
" | | : `- \\`.;`\\ _ /`;.`/ - ` : | |\n" +
" \\ \\ `-. \\_ __\\ /__ _/ .-` / /\n" +
" ======`-.____`-.___\\_____/___.-`____.-'======\n" +
" `=---='\n" +
" ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n" +
" // 佛祖保佑 永不宕机 永无BUG //");
} }
} }

View File

@ -1,6 +1,6 @@
package com.muyu.processing.basic; package com.muyu.processing.basic;
import cn.hutool.json.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
/** /**

View File

@ -1,7 +1,9 @@
package com.muyu.processing.basic; package com.muyu.processing.basic;
import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
/** /**
* *
@ -11,6 +13,7 @@ import org.springframework.context.ApplicationEventPublisherAware;
* @nameEventPublisher * @nameEventPublisher
* @Date2024/9/29 22:31 * @Date2024/9/29 22:31
*/ */
@Component
public class EventPublisher implements ApplicationEventPublisherAware { public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher; private ApplicationEventPublisher publisher;
@ -20,4 +23,13 @@ public class EventPublisher implements ApplicationEventPublisherAware {
this.publisher = applicationEventPublisher; this.publisher = applicationEventPublisher;
} }
/**
*
* @param jsonObject
*/
public void eventPublish(JSONObject jsonObject){
EventCustom eventCustom = new EventCustom(this, jsonObject);
publisher.publishEvent(eventCustom);
}
} }

View File

@ -1,42 +0,0 @@
package com.muyu.processing.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* bean
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送到 broker 成功");
} else {
System.out.println("消息发送到 broker 失败,失败的原因:" + cause);
}
}
}

View File

@ -1,50 +0,0 @@
package com.muyu.processing.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdmin
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualhost;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* rabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -1,18 +0,0 @@
package com.muyu.processing.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*
*/
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}

View File

@ -1,39 +0,0 @@
package com.muyu.processing.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* bean
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setReturnsCallback(this);
}
/**
* queue
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息" + returnedMessage.getMessage().toString() +
"被交换机" + returnedMessage.getExchange() + "回退!"
+ "退回原因为:" + returnedMessage.getReplyText());
// TODO 回退了所有的信息,可做补偿机制
}
}

View File

@ -1,16 +1,16 @@
package com.muyu.processing.consumer; package com.muyu.processing.consumer;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.common.core.constant.KafkaConstants; import com.muyu.common.core.utils.html.EscapeUtil;
import com.muyu.domain.Fence; import com.muyu.domain.Fence;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.domain.WarnRule; import com.muyu.domain.WarnRule;
import com.muyu.domain.WarnStrategy; import com.muyu.domain.WarnStrategy;
import com.muyu.processing.interfaces.EventInterface; import com.muyu.domain.resp.VehicleManageResp;
import com.muyu.processing.basic.EventPublisher;
import com.muyu.processing.utils.CacheUtil; import com.muyu.processing.utils.CacheUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -23,6 +23,8 @@ import javax.annotation.Resource;
import java.time.Duration; import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* kafka * kafka
@ -36,47 +38,64 @@ import java.util.Map;
@Component @Component
public class KafkaConsumerService implements InitializingBean { public class KafkaConsumerService implements InitializingBean {
/**
* Topic
*/
private static final String TIPSY = "tipsy";
@Resource @Resource
private KafkaConsumer kafkaConsumer; private KafkaConsumer kafkaConsumer;
@Resource @Resource
private CacheUtil cacheUtil; private CacheUtil cacheUtil;
// @Resource @Resource
// private EventInterface eventInterface; private EventPublisher eventPublisher;
/**
* 线
*/
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
log.info("启动线程监听Topic: {}", "zeshi"); log.info("启动线程监听Topic: {}", TIPSY);
// 延迟1秒
ThreadUtil.sleep(1000); ThreadUtil.sleep(1000);
Collection<String> topics = Lists.newArrayList("zeshi"); // 订阅主题
Collection<String> topics = Lists.newArrayList(TIPSY);
kafkaConsumer.subscribe(topics); kafkaConsumer.subscribe(topics);
while (true) { while (true) {
// 轮询消费消息
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord consumerRecord : consumerRecords) { consumerRecords.forEach(record -> executorService.execute(() -> publish(record)));
//从ConsumerRecord中获取消费数据
String originalMsg = (String) consumerRecord.value();
log.info("从Kafka中消费的原始数据: " + originalMsg);
//把消费数据转换为JSON对象
JSONObject jsonObject = JSON.parseObject(originalMsg);
log.info("消费数据转换为JSON对象: " + jsonObject);
log.info("消费数据转换为JSON对象: " + jsonObject.toString());
String value = jsonObject.toString();
String vin = value.substring(0, 11);
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
WarnRule warnRule = (WarnRule) map.get("warnRule");
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
Vehicle vehicle = (Vehicle) map.get("vehicle");
Object breakdown = map.get("breakdown");
Fence fence = (Fence) map.get("fence");
// eventInterface.handle(jsonObject);
}
} }
}); });
thread.start(); thread.start();
} }
private void publish(ConsumerRecord consumerRecord) {
//从ConsumerRecord中获取消费数据
String originalMsg = (String) consumerRecord.value();
log.info("从Kafka中消费的原始数据: " + originalMsg);
//把消费数据转换为JSON对象
JSONObject jsonObject = JSON.parseObject(originalMsg);
// 获取VIN码
String vin = (String) jsonObject.get("vin");
log.info("vin码为: {}",vin);
// 获取本地缓存中的数据
// Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
// log.info("map: {}",map);
// Fence fence = (Fence) map.get("fence");
// Object breakdown = map.get("breakdown");
// Vehicle vehicle = (Vehicle) map.get("vehicle");
// WarnRule warnRule = (WarnRule) map.get("warnRule");
// WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
// VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
eventPublisher.eventPublish(jsonObject);
}
} }

View File

@ -1,12 +1,11 @@
package com.muyu.processing.consumer; package com.muyu.processing.consumer;
import com.muyu.enterprise.cache.FaultCacheService;
import com.muyu.enterprise.cache.FenceCahceService;
import com.muyu.enterprise.cache.VehicleCacheService;
import com.muyu.enterprise.cache.WarnRuleCacheService;
import com.muyu.processing.utils.CacheUtil; import com.muyu.processing.utils.CacheUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -24,22 +23,30 @@ import javax.annotation.Resource;
@Component @Component
public class OfflineMonitoringConsumer { public class OfflineMonitoringConsumer {
/**
* 线
*/
private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING";
/**
* 线
*/
private static final String OFFLINE_EXCHANGE = "OFFLINE_EXCHANGE";
@Resource @Resource
private CacheUtil cacheUtil; private CacheUtil cacheUtil;
/** /**
* *
* @param vin vin * @param vin vin
*/ */
@RabbitListener(queuesToDeclare = @Queue("offline_monitoring")) @RabbitListener(bindings = @QueueBinding(
public void receive(String vin){ value = @Queue(value = OFFLINE_MONITORING, declare = "true"),
exchange = @Exchange(value = OFFLINE_EXCHANGE, type = "fanout")))
public void online(String vin){
log.info("清除缓存中的数据,车辆vin: {}", vin); log.info("清除缓存中的数据,车辆vin: {}", vin);
// 清除缓存 // 清除缓存
cacheUtil.remove(vin); cacheUtil.remove(vin);
log.info("vin码为: {}de本地缓存清除成功",vin);
} }
} }

View File

@ -4,13 +4,15 @@ import com.muyu.domain.Fence;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.domain.WarnRule; import com.muyu.domain.WarnRule;
import com.muyu.domain.WarnStrategy; import com.muyu.domain.WarnStrategy;
import com.muyu.domain.resp.VehicleManageResp;
import com.muyu.domain.resp.WarnRuleResp; import com.muyu.domain.resp.WarnRuleResp;
import com.muyu.enterprise.cache.*; import com.muyu.enterprise.cache.*;
import com.muyu.processing.utils.CacheUtil; import com.muyu.processing.utils.CacheUtil;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -29,12 +31,24 @@ import java.util.HashMap;
@Component @Component
public class OnLineMonitoringConsumer { public class OnLineMonitoringConsumer {
/**
* 线
*/
private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING";
/**
* 线
*/
private static final String ONLINE_EXCHANGE = "ONLINE_EXCHANGE";
@Resource @Resource
private CacheUtil cacheUtil; private CacheUtil cacheUtil;
@Resource @Resource
private VehicleCacheService vehicleCacheService; private VehicleCacheService vehicleCacheService;
@Resource
private AllVehicleCacheService allVehicleCacheService;
@Resource @Resource
private FaultCacheService faultCacheService; private FaultCacheService faultCacheService;
@ -50,31 +64,29 @@ public class OnLineMonitoringConsumer {
/** /**
* 线线 * 线线
*/ */
@RabbitListener(queuesToDeclare = @Queue("long_time_no_see")) @RabbitListener(bindings = @QueueBinding(
public void receive(String vin, Message message, Channel channel){ value = @Queue(value = ON_LINE_MONITORING, declare = "true"),
exchange = @Exchange(value = ONLINE_EXCHANGE, type = "fanout")))
try { public void online(String vin){
log.info("添加本地缓存,车辆vin: {}", vin); log.info("添加本地缓存,车辆vin: {}", vin);
// 获取redis中的数据
Fence fence = fenceCahceService.get(vin);
Object breakdown = faultCacheService.get(vin);
Vehicle vehicle = vehicleCacheService.get(vin);
WarnRule warnRule = warnRuleCacheService.get(vin); WarnRule warnRule = warnRuleCacheService.get(vin);
WarnRuleResp warnStrategy = warnStrategyCacheService.get(vin); WarnRuleResp warnStrategy = warnStrategyCacheService.get(vin);
Vehicle vehicle = vehicleCacheService.get(vin); VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin);
Object breakdown = faultCacheService.get(vin); // 封装从redis中获得的数据
Fence fence = fenceCahceService.get(vin);
HashMap<String, Object> map = new HashMap<>(); HashMap<String, Object> map = new HashMap<>();
map.put("fence",fence);
map.put("breakdown",breakdown);
map.put("vehicle",vehicle);
map.put("warnRule",warnRule); map.put("warnRule",warnRule);
map.put("warnStrategy",warnStrategy); map.put("warnStrategy",warnStrategy);
map.put("vehicle",vehicle); map.put("vehicleManageResp",vehicleManageResp);
map.put("breakdown",breakdown); // 添加到本地缓存中
map.put("fence",fence);
cacheUtil.put(vin,map); cacheUtil.put(vin,map);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map);
} catch (Exception e) {
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} }
} }

View File

@ -1,13 +1,9 @@
package com.muyu.processing.controller; package com.muyu.processing.controller;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import com.muyu.common.core.constant.KafkaConstants;
import com.muyu.common.core.utils.uuid.UUID;
import com.muyu.common.kafka.config.KafkaProducerConfig;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.protocol.types.Field;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@ -28,7 +24,17 @@ import javax.annotation.Resource;
@RequestMapping("/kafka") @RequestMapping("/kafka")
public class TestKafka { public class TestKafka {
private static final String TIPSY = "tipsy";
private static final String VIN = "1123wsdfr54323wsd";
/**
* 线
*/
private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING";
/**
* 线
*/
private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING";
@Resource @Resource
private KafkaProducer<String, String> kafkaProducer; private KafkaProducer<String, String> kafkaProducer;
@ -37,39 +43,35 @@ public class TestKafka {
/** /**
* Kafka * Kafka
* @return String
*/ */
@GetMapping("/send") @GetMapping("/send")
public String sendMsg(){ public String sendMsg(){
JSONObject entries = new JSONObject(); JSONObject entries = new JSONObject();
entries.set("vin","vin123468"); entries.set("vin",VIN);
entries.set("name","宝马"); entries.set("name","宝马");
String entriesString = entries.toString(); String entriesString = entries.toString();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", entriesString); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TIPSY, entriesString);
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
return "OK"; return "OK";
} }
/** /**
* MQ * 线
* @return String
*/ */
@GetMapping("/sendMQ") @GetMapping("/sendMQ")
public String sendMQ(){ public String sendMQ(){
rabbitTemplate.convertAndSend("long_time_no_see","晨哀,好久不见",message -> { // 发送消息
message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(ON_LINE_MONITORING,VIN);
return message;
});
return "OK"; return "OK";
} }
/** /**
* MQ * 线
* @return String
*/ */
@GetMapping("/sendDui") @GetMapping("/sendDui")
public String sedDui() { public String sedDui() {
rabbitTemplate.convertAndSend("myExchange","Im.fine",""); // 发送消息
rabbitTemplate.convertAndSend(OFFLINE_MONITORING,VIN);
return "OK"; return "OK";
} }
} }

View File

@ -1,10 +1,15 @@
package com.muyu.processing.listener; package com.muyu.processing.listener;
import cn.hutool.json.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.muyu.processing.basic.EventCustom; import com.muyu.processing.basic.EventCustom;
import com.muyu.processing.basic.EventListener; import com.muyu.processing.basic.EventListener;
import com.muyu.processing.utils.CacheUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.common.protocol.types.Field;
import javax.annotation.Resource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map;
/** /**
* *
@ -14,10 +19,21 @@ import java.util.ArrayList;
* @nameAddDatabaseListener * @nameAddDatabaseListener
* @Date2024/9/29 22:25 * @Date2024/9/29 22:25
*/ */
@Log4j2
public class AddDatabaseListener implements EventListener { public class AddDatabaseListener implements EventListener {
@Resource
private CacheUtil cacheUtil;
@Override @Override
public void onEvent(EventCustom event) { public void onEvent(EventCustom event) {
log.info("数据库添加");
log.info("数据为: {}",event.getData());
JSONObject jsonObject = event.getData(); JSONObject jsonObject = event.getData();
String vin = (String) jsonObject.get("vin");
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
if (map != null){
log.info("本地缓存数据为: {}",map);
ArrayList<Object> keys = new ArrayList<>(); ArrayList<Object> keys = new ArrayList<>();
ArrayList<Object> values = new ArrayList<>(); ArrayList<Object> values = new ArrayList<>();
jsonObject.forEach((key, value) ->{ jsonObject.forEach((key, value) ->{
@ -26,6 +42,7 @@ public class AddDatabaseListener implements EventListener {
}); });
// 添加数据库 // 添加数据库
} }
}
@Override @Override
public void onApplicationEvent(EventCustom event) { public void onApplicationEvent(EventCustom event) {