Compare commits
8 Commits
83961147fc
...
61f1ea45b2
Author | SHA1 | Date |
---|---|---|
|
61f1ea45b2 | |
|
ad0460a90b | |
|
72ac72330e | |
|
1e9c92e4d3 | |
|
e2e943749e | |
|
013fe5809d | |
|
7f45b28116 | |
|
d3f2444521 |
|
@ -20,6 +20,26 @@ public class VehicleProcessingApplication {
|
|||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(VehicleProcessingApplication.class, args);
|
||||
System.out.println(" _ooOoo_\n" +
|
||||
" o8888888o\n" +
|
||||
" 88\" . \"88\n" +
|
||||
" (| -_- |)\n" +
|
||||
" O\\ = /O\n" +
|
||||
" ____/`---'\\____\n" +
|
||||
" .' \\\\| |// `.\n" +
|
||||
" / \\\\||| : |||// \\\n" +
|
||||
" / _||||| -:- |||||- \\\n" +
|
||||
" | | \\\\\\ - /// | |\n" +
|
||||
" | \\_| ''\\---/'' | |\n" +
|
||||
" \\ .-\\__ `-` ___/-. /\n" +
|
||||
" ___`. .' /--.--\\ `. . __\n" +
|
||||
" .\"\" '< `.___\\_<|>_/___.' >'\"\".\n" +
|
||||
" | | : `- \\`.;`\\ _ /`;.`/ - ` : | |\n" +
|
||||
" \\ \\ `-. \\_ __\\ /__ _/ .-` / /\n" +
|
||||
" ======`-.____`-.___\\_____/___.-`____.-'======\n" +
|
||||
" `=---='\n" +
|
||||
" ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n" +
|
||||
" // 佛祖保佑 永不宕机 永无BUG //");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package com.muyu.processing.basic;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package com.muyu.processing.basic;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 策略发送事件
|
||||
|
@ -11,6 +13,7 @@ import org.springframework.context.ApplicationEventPublisherAware;
|
|||
* @name:EventPublisher
|
||||
* @Date:2024/9/29 22:31
|
||||
*/
|
||||
@Component
|
||||
public class EventPublisher implements ApplicationEventPublisherAware {
|
||||
|
||||
private ApplicationEventPublisher publisher;
|
||||
|
@ -20,4 +23,13 @@ public class EventPublisher implements ApplicationEventPublisherAware {
|
|||
this.publisher = applicationEventPublisher;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送事件
|
||||
* @param jsonObject 数据
|
||||
*/
|
||||
public void eventPublish(JSONObject jsonObject){
|
||||
EventCustom eventCustom = new EventCustom(this, jsonObject);
|
||||
publisher.publishEvent(eventCustom);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
package com.muyu.processing.config;
|
||||
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* 确认回调配置
|
||||
*/
|
||||
@Component
|
||||
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* 当前bean初始化的时候执行
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.rabbitTemplate.setConfirmCallback(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 确认方法
|
||||
* @param correlationData correlation data for the callback.
|
||||
* @param ack true for ack, false for nack
|
||||
* @param cause An optional cause, for nack, when available, otherwise null.
|
||||
*/
|
||||
@Override
|
||||
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||
if (ack) {
|
||||
System.out.println("消息发送到 broker 成功");
|
||||
} else {
|
||||
System.out.println("消息发送到 broker 失败,失败的原因:" + cause);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
package com.muyu.processing.config;
|
||||
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 构建 RabbitAdmin
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitAdminConfig {
|
||||
|
||||
@Value("${spring.rabbitmq.host}")
|
||||
private String host;
|
||||
@Value("${spring.rabbitmq.username}")
|
||||
private String username;
|
||||
@Value("${spring.rabbitmq.password}")
|
||||
private String password;
|
||||
@Value("${spring.rabbitmq.virtual-host}")
|
||||
private String virtualhost;
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory connectionFactory() {
|
||||
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
|
||||
connectionFactory.setAddresses(host);
|
||||
connectionFactory.setUsername(username);
|
||||
connectionFactory.setPassword(password);
|
||||
connectionFactory.setVirtualHost(virtualhost);
|
||||
// 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
|
||||
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
|
||||
connectionFactory.setPublisherReturns(true);
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* rabbitAdmin
|
||||
* @param connectionFactory
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
|
||||
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
|
||||
rabbitAdmin.setAutoStartup(true);
|
||||
return rabbitAdmin;
|
||||
}
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
package com.muyu.processing.config;
|
||||
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 消息转换配置
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitmqConfig {
|
||||
// 消息转换配置
|
||||
@Bean
|
||||
public MessageConverter jsonMessageConverter(){
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
package com.muyu.processing.config;
|
||||
|
||||
import org.springframework.amqp.core.ReturnedMessage;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* 消息发送失败返回配置
|
||||
*/
|
||||
@Component
|
||||
public class ReturnsCallbackConfig implements RabbitTemplate.ReturnsCallback {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
/**
|
||||
* 当前bean初始化的时候执行
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.rabbitTemplate.setReturnsCallback(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息发送达到 queue 失败执行
|
||||
*
|
||||
* @param returnedMessage the returned message and metadata.
|
||||
*/
|
||||
@Override
|
||||
public void returnedMessage(ReturnedMessage returnedMessage) {
|
||||
System.out.println("消息" + returnedMessage.getMessage().toString() +
|
||||
"被交换机" + returnedMessage.getExchange() + "回退!"
|
||||
+ "退回原因为:" + returnedMessage.getReplyText());
|
||||
// TODO 回退了所有的信息,可做补偿机制
|
||||
}
|
||||
}
|
|
@ -1,16 +1,16 @@
|
|||
package com.muyu.processing.consumer;
|
||||
|
||||
import cn.hutool.core.thread.ThreadUtil;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
||||
import com.muyu.common.core.constant.KafkaConstants;
|
||||
import com.muyu.common.core.utils.html.EscapeUtil;
|
||||
import com.muyu.domain.Fence;
|
||||
import com.muyu.domain.Vehicle;
|
||||
import com.muyu.domain.WarnRule;
|
||||
import com.muyu.domain.WarnStrategy;
|
||||
import com.muyu.processing.interfaces.EventInterface;
|
||||
import com.muyu.domain.resp.VehicleManageResp;
|
||||
import com.muyu.processing.basic.EventPublisher;
|
||||
import com.muyu.processing.utils.CacheUtil;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
@ -23,6 +23,8 @@ import javax.annotation.Resource;
|
|||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
|
@ -36,47 +38,64 @@ import java.util.Map;
|
|||
@Component
|
||||
public class KafkaConsumerService implements InitializingBean {
|
||||
|
||||
/**
|
||||
* 订阅的Topic
|
||||
*/
|
||||
private static final String TIPSY = "tipsy";
|
||||
|
||||
@Resource
|
||||
private KafkaConsumer kafkaConsumer;
|
||||
|
||||
@Resource
|
||||
private CacheUtil cacheUtil;
|
||||
|
||||
// @Resource
|
||||
// private EventInterface eventInterface;
|
||||
@Resource
|
||||
private EventPublisher eventPublisher;
|
||||
|
||||
/**
|
||||
* 线程池
|
||||
*/
|
||||
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
||||
Thread thread = new Thread(() -> {
|
||||
log.info("启动线程监听Topic: {}", "zeshi");
|
||||
log.info("启动线程监听Topic: {}", TIPSY);
|
||||
// 延迟1秒
|
||||
ThreadUtil.sleep(1000);
|
||||
Collection<String> topics = Lists.newArrayList("zeshi");
|
||||
// 订阅主题
|
||||
Collection<String> topics = Lists.newArrayList(TIPSY);
|
||||
kafkaConsumer.subscribe(topics);
|
||||
while (true) {
|
||||
// 轮询消费消息
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
|
||||
for (ConsumerRecord consumerRecord : consumerRecords) {
|
||||
//从ConsumerRecord中获取消费数据
|
||||
String originalMsg = (String) consumerRecord.value();
|
||||
log.info("从Kafka中消费的原始数据: " + originalMsg);
|
||||
//把消费数据转换为JSON对象
|
||||
JSONObject jsonObject = JSON.parseObject(originalMsg);
|
||||
log.info("消费数据转换为JSON对象: " + jsonObject);
|
||||
log.info("消费数据转换为JSON对象: " + jsonObject.toString());
|
||||
|
||||
String value = jsonObject.toString();
|
||||
String vin = value.substring(0, 11);
|
||||
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
|
||||
WarnRule warnRule = (WarnRule) map.get("warnRule");
|
||||
WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
|
||||
Vehicle vehicle = (Vehicle) map.get("vehicle");
|
||||
Object breakdown = map.get("breakdown");
|
||||
Fence fence = (Fence) map.get("fence");
|
||||
// eventInterface.handle(jsonObject);
|
||||
}
|
||||
consumerRecords.forEach(record -> executorService.execute(() -> publish(record)));
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
|
||||
}
|
||||
|
||||
private void publish(ConsumerRecord consumerRecord) {
|
||||
//从ConsumerRecord中获取消费数据
|
||||
String originalMsg = (String) consumerRecord.value();
|
||||
log.info("从Kafka中消费的原始数据: " + originalMsg);
|
||||
//把消费数据转换为JSON对象
|
||||
JSONObject jsonObject = JSON.parseObject(originalMsg);
|
||||
// 获取VIN码
|
||||
String vin = (String) jsonObject.get("vin");
|
||||
log.info("vin码为: {}",vin);
|
||||
// 获取本地缓存中的数据
|
||||
// Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
|
||||
// log.info("map: {}",map);
|
||||
// Fence fence = (Fence) map.get("fence");
|
||||
// Object breakdown = map.get("breakdown");
|
||||
// Vehicle vehicle = (Vehicle) map.get("vehicle");
|
||||
// WarnRule warnRule = (WarnRule) map.get("warnRule");
|
||||
// WarnStrategy warnStrategy = (WarnStrategy) map.get("warnStrategy");
|
||||
// VehicleManageResp vehicleManageResp = (VehicleManageResp) map.get("vehicleManageResp");
|
||||
eventPublisher.eventPublish(jsonObject);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
package com.muyu.processing.consumer;
|
||||
|
||||
import com.muyu.enterprise.cache.FaultCacheService;
|
||||
import com.muyu.enterprise.cache.FenceCahceService;
|
||||
import com.muyu.enterprise.cache.VehicleCacheService;
|
||||
import com.muyu.enterprise.cache.WarnRuleCacheService;
|
||||
|
||||
import com.muyu.processing.utils.CacheUtil;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -24,22 +23,30 @@ import javax.annotation.Resource;
|
|||
@Component
|
||||
public class OfflineMonitoringConsumer {
|
||||
|
||||
/**
|
||||
* 下线监听队列名称
|
||||
*/
|
||||
private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING";
|
||||
/**
|
||||
* 下线监听交换机名称
|
||||
*/
|
||||
private static final String OFFLINE_EXCHANGE = "OFFLINE_EXCHANGE";
|
||||
|
||||
@Resource
|
||||
private CacheUtil cacheUtil;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 接收消息
|
||||
* @param vin 车辆vin
|
||||
*/
|
||||
@RabbitListener(queuesToDeclare = @Queue("offline_monitoring"))
|
||||
public void receive(String vin){
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(value = OFFLINE_MONITORING, declare = "true"),
|
||||
exchange = @Exchange(value = OFFLINE_EXCHANGE, type = "fanout")))
|
||||
public void online(String vin){
|
||||
log.info("清除缓存中的数据,车辆vin: {}", vin);
|
||||
// 清除缓存
|
||||
cacheUtil.remove(vin);
|
||||
log.info("vin码为: {}de本地缓存清除成功",vin);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -4,13 +4,15 @@ import com.muyu.domain.Fence;
|
|||
import com.muyu.domain.Vehicle;
|
||||
import com.muyu.domain.WarnRule;
|
||||
import com.muyu.domain.WarnStrategy;
|
||||
import com.muyu.domain.resp.VehicleManageResp;
|
||||
import com.muyu.domain.resp.WarnRuleResp;
|
||||
import com.muyu.enterprise.cache.*;
|
||||
import com.muyu.processing.utils.CacheUtil;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -29,12 +31,24 @@ import java.util.HashMap;
|
|||
@Component
|
||||
public class OnLineMonitoringConsumer {
|
||||
|
||||
/**
|
||||
* 上线监听队列名称
|
||||
*/
|
||||
private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING";
|
||||
/**
|
||||
* 上线监听交换机名称
|
||||
*/
|
||||
private static final String ONLINE_EXCHANGE = "ONLINE_EXCHANGE";
|
||||
|
||||
@Resource
|
||||
private CacheUtil cacheUtil;
|
||||
|
||||
@Resource
|
||||
private VehicleCacheService vehicleCacheService;
|
||||
|
||||
@Resource
|
||||
private AllVehicleCacheService allVehicleCacheService;
|
||||
|
||||
@Resource
|
||||
private FaultCacheService faultCacheService;
|
||||
|
||||
|
@ -50,32 +64,30 @@ public class OnLineMonitoringConsumer {
|
|||
/**
|
||||
* 上线监听车辆网关中车辆上线时
|
||||
*/
|
||||
@RabbitListener(queuesToDeclare = @Queue("long_time_no_see"))
|
||||
public void receive(String vin, Message message, Channel channel){
|
||||
|
||||
try {
|
||||
log.info("添加本地缓存,车辆vin: {}", vin);
|
||||
WarnRule warnRule = warnRuleCacheService.get(vin);
|
||||
WarnRuleResp warnStrategy = warnStrategyCacheService.get(vin);
|
||||
Vehicle vehicle = vehicleCacheService.get(vin);
|
||||
Object breakdown = faultCacheService.get(vin);
|
||||
Fence fence = fenceCahceService.get(vin);
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
map.put("warnRule",warnRule);
|
||||
map.put("warnStrategy",warnStrategy);
|
||||
map.put("vehicle",vehicle);
|
||||
map.put("breakdown",breakdown);
|
||||
map.put("fence",fence);
|
||||
cacheUtil.put(vin,map);
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
||||
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(value = ON_LINE_MONITORING, declare = "true"),
|
||||
exchange = @Exchange(value = ONLINE_EXCHANGE, type = "fanout")))
|
||||
public void online(String vin){
|
||||
log.info("添加本地缓存,车辆vin: {}", vin);
|
||||
// 获取redis中的数据
|
||||
Fence fence = fenceCahceService.get(vin);
|
||||
Object breakdown = faultCacheService.get(vin);
|
||||
Vehicle vehicle = vehicleCacheService.get(vin);
|
||||
WarnRule warnRule = warnRuleCacheService.get(vin);
|
||||
WarnRuleResp warnStrategy = warnStrategyCacheService.get(vin);
|
||||
VehicleManageResp vehicleManageResp = allVehicleCacheService.get(vin);
|
||||
// 封装从redis中获得的数据
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
map.put("fence",fence);
|
||||
map.put("breakdown",breakdown);
|
||||
map.put("vehicle",vehicle);
|
||||
map.put("warnRule",warnRule);
|
||||
map.put("warnStrategy",warnStrategy);
|
||||
map.put("vehicleManageResp",vehicleManageResp);
|
||||
// 添加到本地缓存中
|
||||
cacheUtil.put(vin,map);
|
||||
log.info("vin码为: {}, 数据为: {}, 已完成本地缓存",vin,map);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,13 +1,9 @@
|
|||
package com.muyu.processing.controller;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.muyu.common.core.constant.KafkaConstants;
|
||||
import com.muyu.common.core.utils.uuid.UUID;
|
||||
import com.muyu.common.kafka.config.KafkaProducerConfig;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.protocol.types.Field;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
|
@ -28,7 +24,17 @@ import javax.annotation.Resource;
|
|||
@RequestMapping("/kafka")
|
||||
public class TestKafka {
|
||||
|
||||
private static final String TIPSY = "tipsy";
|
||||
private static final String VIN = "1123wsdfr54323wsd";
|
||||
|
||||
/**
|
||||
* 上线监听队列名称
|
||||
*/
|
||||
private static final String ON_LINE_MONITORING = "MQ_ON_LINE_MONITORING";
|
||||
/**
|
||||
* 下线监听队列名称
|
||||
*/
|
||||
private static final String OFFLINE_MONITORING = "MQ_OFFLINE_MONITORING";
|
||||
@Resource
|
||||
private KafkaProducer<String, String> kafkaProducer;
|
||||
|
||||
|
@ -37,39 +43,35 @@ public class TestKafka {
|
|||
|
||||
/**
|
||||
* 发送Kafka消息
|
||||
* @return String
|
||||
*/
|
||||
@GetMapping("/send")
|
||||
public String sendMsg(){
|
||||
JSONObject entries = new JSONObject();
|
||||
entries.set("vin","vin123468");
|
||||
entries.set("vin",VIN);
|
||||
entries.set("name","宝马");
|
||||
String entriesString = entries.toString();
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zeshi", entriesString);
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TIPSY, entriesString);
|
||||
kafkaProducer.send(producerRecord);
|
||||
return "OK";
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送MQ消息
|
||||
* @return String
|
||||
* 上线监听测试
|
||||
*/
|
||||
@GetMapping("/sendMQ")
|
||||
public String sendMQ(){
|
||||
rabbitTemplate.convertAndSend("long_time_no_see","晨哀,好久不见",message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
});
|
||||
// 发送消息
|
||||
rabbitTemplate.convertAndSend(ON_LINE_MONITORING,VIN);
|
||||
return "OK";
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送MQ队列消息
|
||||
* @return String
|
||||
* 下线监听测试
|
||||
*/
|
||||
@GetMapping("/sendDui")
|
||||
public String sedDui() {
|
||||
rabbitTemplate.convertAndSend("myExchange","Im.fine","");
|
||||
// 发送消息
|
||||
rabbitTemplate.convertAndSend(OFFLINE_MONITORING,VIN);
|
||||
return "OK";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,15 @@
|
|||
package com.muyu.processing.listener;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.processing.basic.EventCustom;
|
||||
import com.muyu.processing.basic.EventListener;
|
||||
import com.muyu.processing.utils.CacheUtil;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.kafka.common.protocol.types.Field;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 添加数据库事件
|
||||
|
@ -14,17 +19,29 @@ import java.util.ArrayList;
|
|||
* @name:AddDatabaseListener
|
||||
* @Date:2024/9/29 22:25
|
||||
*/
|
||||
@Log4j2
|
||||
public class AddDatabaseListener implements EventListener {
|
||||
|
||||
@Resource
|
||||
private CacheUtil cacheUtil;
|
||||
|
||||
@Override
|
||||
public void onEvent(EventCustom event) {
|
||||
log.info("数据库添加");
|
||||
log.info("数据为: {}",event.getData());
|
||||
JSONObject jsonObject = event.getData();
|
||||
ArrayList<Object> keys = new ArrayList<>();
|
||||
ArrayList<Object> values = new ArrayList<>();
|
||||
jsonObject.forEach((key, value) ->{
|
||||
keys.add(key);
|
||||
values.add(value);
|
||||
});
|
||||
// 添加数据库
|
||||
String vin = (String) jsonObject.get("vin");
|
||||
Map<String, Object> map = (Map<String, Object>) cacheUtil.get(vin);
|
||||
if (map != null){
|
||||
log.info("本地缓存数据为: {}",map);
|
||||
ArrayList<Object> keys = new ArrayList<>();
|
||||
ArrayList<Object> values = new ArrayList<>();
|
||||
jsonObject.forEach((key, value) ->{
|
||||
keys.add(key);
|
||||
values.add(value);
|
||||
});
|
||||
// 添加数据库
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue