fix(): 事件基础修改,kafka注入

dev.event
刘武 2024-09-30 16:34:37 +08:00
parent cc99d65102
commit f46db057b5
18 changed files with 757 additions and 69 deletions

View File

@ -17,6 +17,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<description>
cloud-common-rabbit 消息队列服务
</description>
<dependencies> <dependencies>
<!-- rabbitMq 消息队列 --> <!-- rabbitMq 消息队列 -->
@ -28,8 +32,8 @@
<!-- 项目公共核心 --> <!-- 项目公共核心 -->
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId> <artifactId>cloud-common-redis</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,41 +0,0 @@
package com.muyu.common.rabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
@Configuration
public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer {
static {
System.setProperty("spring.amqp.deserialization.trust.all", "true");
}
//以下配置RabbitMQ消息服务
@Autowired
public ConnectionFactory connectionFactory;
/**
*
* @return
*/
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
// 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
}
}

View File

@ -0,0 +1,83 @@
package com.muyu.common.rabbit.config;
import com.muyu.common.rabbit.constants.RabbitmqConstants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
/**
* @ClassName: DelayedQueueConfig
* @Description:
*/
@Configuration
public class DelayedQueueConfig {
@Resource
private RabbitAdmin rabbitAdmin;
/**
*
* @return
*/
@Bean
public Queue delayedQueue() {
Queue queue = new Queue(RabbitmqConstants.DELAYED_QUEUE_NAME);
rabbitAdmin.declareQueue(queue);
return queue;
}
/**
*
* @return
*/
@Bean
public Exchange delayedExchange() {
HashMap<String, Object> arguments = new HashMap<>(3);
arguments.put("x-delayed-type", "direct");
/**
*
*
*
*
*
*
*/
CustomExchange customExchange = new CustomExchange(
RabbitmqConstants.DELAYED_EXCHANGE_NAME,
"x-delayed-message",
true,
false,
arguments);
rabbitAdmin.declareExchange(customExchange);
return customExchange;
}
/**
*
* @param delayedQueue
* @param delayedExchange
*/
@Bean
public Binding delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") Exchange delayedExchange) {
Binding noargs = BindingBuilder.bind(delayedQueue)
.to(delayedExchange)
.with(RabbitmqConstants.DELAYED_ROUTING_KEY)
.noargs();
rabbitAdmin.declareBinding(noargs);
return noargs;
}
}

View File

@ -0,0 +1,47 @@
package com.muyu.common.rabbit.config;
import lombok.AllArgsConstructor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @ClassName:
* @Description:
*/
@Component
@AllArgsConstructor
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
// public MyConfirmCallback(RabbitTemplate rabbitTemplate) {
// this.rabbitTemplate = rabbitTemplate;
// // 设置 消息发送到交换机成功 的回调
// this.rabbitTemplate.setConfirmCallback(this);
// }
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
*
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送到交换机成功~");
} else {
System.out.println("消息发送到交换机失败,失败的原因:" + cause);
}
}
}

View File

@ -0,0 +1,49 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName: RabbitAdminConfig
* @Description: RabbitAdmin
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualHost;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setVirtualHost(virtualHost);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,20 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* JSON json
*/
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,37 @@
package com.muyu.common.rabbit.config;
import lombok.AllArgsConstructor;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
@AllArgsConstructor
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
private final RabbitTemplate rabbitTemplate;
@PostConstruct // @PostContruct是spring框架的注解在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法也可以理解为在spring容器初始化的时候执
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息" + returnedMessage.getMessage().toString() +
"被交换机" + returnedMessage.getExchange() + "回退!"
+ "退回原因为:" + returnedMessage.getReplyText());
// 回退了所有的信息,可做补偿机制 记录发送的日志
}
}

View File

@ -0,0 +1,75 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: WangXin
* @Time: 2024/4/22 11:55
* @Description:
*/
@Configuration
public class TopicConfig {
/**
*
* @return exchange
*/
@Bean(name = "topicExchange")
public Exchange getTopicExchange(){
return ExchangeBuilder
.topicExchange("exchange_topic")
.build();
}
/**
* 01
* @return queue
*/
@Bean(name = "topicQueue01")
public Queue getTopicQueue01(){
return QueueBuilder
.durable("queue_topic_01")
.build();
}
/**
* 02
* @return queue
*/
@Bean(name = "topicQueue02")
public Queue getTopicQueue02(){
return QueueBuilder
.durable("queue_topic_02")
.build();
}
/**
* 01
* @return binding
*/
@Bean
public Binding getTopicBinding01(){
return BindingBuilder
.bind(getTopicQueue01())
.to(getTopicExchange())
//路由键 队列1接收debug级别的消息
.with("front.#")
.noargs();
}
/**
* 02
* @return binding
*/
@Bean
public Binding getTopicBinding02(){
return BindingBuilder
.bind(getTopicQueue02())
.to(getTopicExchange())
// 路由键 队列2接收info级别的消息
.with("back.order.*")
.noargs();
}
}

View File

@ -0,0 +1,35 @@
package com.muyu.common.rabbit.constants;
/**
*
* @author:
* @date: 2024/7/10
* @Description: rabbitmq
* @Version 1.0.0
*/
public interface RabbitmqConstants {
//普通队列
String BASIC_QUEUE_NAME = "BASIC_QUEUE_NAME";
String lOG_QUEUE_NAME = "LOG_QUEUE_NAME";
//延迟队列
//队列名称
String DELAYED_QUEUE_NAME = "delayed_queue";
//交换机名称
String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";
//交换机
String DELAYED_ROUTING_KEY = "delayed";
/**
* 线
*/
String TOP_BOTTOM_STITCHING = "top_bottom_stitching";
/**
* 线
*/
String TOP_RULE = "car.top.data";
/**
* 线
*/
String BOTTOM_RULE = "car.bottom.data";
}

View File

@ -0,0 +1,140 @@
package com.muyu.common.rabbit.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.redis.service.RedisService;
import com.rabbitmq.client.Channel;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* @ClassName: RabbitMQConsumerUtil
* @Description: rabbitmq
*/
@Component
@Log4j2
@AllArgsConstructor
public class RabbitMQConsumerUtil {
private final RedisService redisService;
/**
*
* @param data
* @param message
* @param channel
*/
public void rabbitMQBasicConsumer(Object data ,Message message , Channel channel) {
log.info("当前时间:{} RabbitMQConsumerUtil : {}", new Date(), message);
try {
// 获取到消息 开始消费
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
if (add != 1) {
return;
}
/**
* ---------------------------------------------------------------
*/
/**
* ------------------------------------------------------------------------------
*/
// 消费消息成功之后需要确认
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("xxx消费者接收到消息消息内容{},消费成功...", message);
} catch (Exception e) {
log.error("xxx消费者接收到消息消息内容{},消费消息异常,异常信息:{}", message, e);
// 消息回退 拒绝消费消息
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean requeue 是否回到原来的队列
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ex) {
log.error("xxx消费者接收到消息消息内容{},回退消息异常,异常信息:{}", message, ex);
}
}finally {
try {
channel.close();
} catch (Exception e) {
log.error("xxx消费者关闭Channel异常消息内容{},异常信息:{}", message, e);
}
}
}
/**
*
* @param data
* @param message
* @param channel
*/
public void carUpConsumer(String data,Message message , Channel channel) {
log.info("当前时间:{} RabbitMQConsumerUtil : {}", new Date(), message);
try {
// 获取到消息 开始消费
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
if (add != 1) {
return;
}
/**
* ---------------------------------------------------------------
*/
log.info("[ 根据vin拿到缓存 ] vin为 --》 {}",data);
log.info("[ 存入本地缓存 ] 数据为 --》 {}",data);
log.info("[ 存入本地缓存 ] 数据为 --》 {}",data);
/**
* ------------------------------------------------------------------------------
*/
// 消费消息成功之后需要确认
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("xxx消费者接收到消息消息内容{},消费成功...", message);
} catch (Exception e) {
log.error("xxx消费者接收到消息消息内容{},消费消息异常,异常信息:{}", message, e);
// 消息回退 拒绝消费消息
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean requeue 是否回到原来的队列
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ex) {
log.error("xxx消费者接收到消息消息内容{},回退消息异常,异常信息:{}", message, ex);
}
}finally {
try {
channel.close();
} catch (Exception e) {
log.error("xxx消费者关闭Channel异常消息内容{},异常信息:{}", message, e);
}
}
}
}

View File

@ -0,0 +1,174 @@
package com.muyu.common.rabbit.producer;
import com.muyu.common.core.domain.Result;
import com.muyu.common.rabbit.constants.RabbitmqConstants;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @ClassName: RabbitMQProducer
* @Description: rabbitmq
*/
@Component
@AllArgsConstructor
@Log4j2
public class RabbitMQProducerUtil {
//redis工具类对象
//rabbit
private final RabbitTemplate rabbitTemplate;
/**
*
*
* @param param
* @return
*
*/
public Result<?> basicSendMessage(String queueName, Object param, String msg) {
log.info("【简单模型mq】 : method: 【 basicSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", RabbitmqConstants.BASIC_QUEUE_NAME, param, msg);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(queueName, param, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【简单模型mq】 : method: 【 basicSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", RabbitmqConstants.BASIC_QUEUE_NAME);
return Result.success(msg!=null?msg:"消息发送成功");
}
/**
* Work queue
*
* @param obj
* @return
*
*/
public Result<?> workSendMessage(String queueName, Object obj, String msg) {
log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(queueName, obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
return Result.success("消息发送成功");
}
/**
* Publish/Subscribe
* fanout
*
* @param exchange
* @param obj Object
* @param msg
* @return
*/
public Result<?> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
// 发送简单模型消息
// 第一个参数: exchange 交换机的名称
// 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
// 第三个参数:消息内容
rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
return Result.success("消息发送成功");
}
/**
* Routing
* 使 Direct Queue()
*
* @param exchange
* @param rule
* @param obj Object
* @param msg
* @return
*/
public Result<?> routingSendMessage(String exchange, String rule, Object obj, String msg) {
log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
return Result.success("消息发送成功");
}
/**
* Topic
* 使 topic
*
* @param exchange
* @param rule . 使 #( ) *( ) name.msg, *.msg, age.#
* @param obj Object
* @param msg
* @return
*/
public Result<?> topicSendMessage(String exchange, String rule, Object obj) {
log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
return Result.success(obj,"消息发送成功");
}
/**
*
* @param param
* @param delayTime
* @return
*/
public Result<?> delayedSendMessage(Long delayTime, Object param) {
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送中。。。 】",param);
rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYED_EXCHANGE_NAME, RabbitmqConstants.DELAYED_ROUTING_KEY,param, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setDelayLong(delayTime);
return message;
});
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送成功 】",param);
return Result.success(param,"消息发送成功");
}
}

View File

@ -1 +1,7 @@
com.muyu.common.rabbit.RabbitListenerConfigurer com.muyu.rabbitmq.producer.RabbitMQProducerUtil
com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil
com.muyu.rabbitmq.config.RabbitmqConfig
com.muyu.rabbitmq.config.MyConfirmCallback
com.muyu.rabbitmq.config.DelayedQueueConfig
com.muyu.rabbitmq.config.RabbitAdminConfig
com.muyu.rabbitmq.config.ReturnCallbackConfig

View File

@ -54,13 +54,6 @@
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Mysql Connector --> <!-- Mysql Connector -->
<dependency> <dependency>
<groupId>com.mysql</groupId> <groupId>com.mysql</groupId>
@ -112,15 +105,14 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>com.muyu</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>cloud-common-kafka</artifactId>
<version>2.8.0</version> <version>3.6.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId> <artifactId>cloud-common-rabbit</artifactId>
<version>3.6.3</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -4,6 +4,7 @@ package com.muyu.event.consumer;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.basic.EventPublisher; import com.muyu.event.basic.EventPublisher;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -25,6 +26,7 @@ import static org.bouncycastle.asn1.x500.style.RFC4519Style.l;
* @date2024/9/28 23:34 * @date2024/9/28 23:34
*/ */
@Component @Component
@Log4j2
public class MessageConsumer implements ApplicationRunner { public class MessageConsumer implements ApplicationRunner {
@Autowired @Autowired
@ -32,7 +34,7 @@ public class MessageConsumer implements ApplicationRunner {
@Autowired @Autowired
private EventPublisher eventPublisher; private EventPublisher eventPublisher;
private final String topic="vehicle"; private final String topic="four_car";
@Override @Override
@ -44,13 +46,9 @@ public class MessageConsumer implements ApplicationRunner {
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(100)); ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> { consumerRecords.forEach(record -> {
String value = record.value(); String value = record.value();
System.out.println(value); JSONObject jsonObject = JSONObject.parseObject(value);
log.info("value:{}",value);
JSONObject jsonObject = new JSONObject(); // eventPublisher.publishEvent(jsonObject);
jsonObject.put("123","123");
//事件处理
eventPublisher.publishEvent(jsonObject);
}); });

View File

@ -0,0 +1,21 @@
package com.muyu.event.consumer;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
/**
* 线
* @author
* @packagecom.muyu.event.consumer
* @nameOnlineConsumer
* @date2024/9/30 11:40
*/
@Component
@Log4j2
public class OnlineConsumer {
}

View File

@ -5,12 +5,15 @@ import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.service.TestService; import com.muyu.event.service.TestService;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
/** /**
* @author * @author
@ -25,13 +28,18 @@ public class TestController {
private KafkaProducer kafkaProducer; private KafkaProducer kafkaProducer;
private static final String topic="vehicle";
private static final String topic="four_car";
@GetMapping("send") @GetMapping("send")
public String sendKafka(){ public String sendKafka(){
String message="发送一条信息"; String message="发送一条信息";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message); JSONObject jsonObject = new JSONObject();
jsonObject.put("cj","sb");
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,jsonObject.toJSONString());
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
return "success"; return "success";
@ -50,4 +58,6 @@ public class TestController {
} }

View File

@ -0,0 +1,37 @@
package com.muyu.event.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.stereotype.Component;
/**
*
*
* @program: cloud-server
* @author:
* @create: 2024-09-30 10:08
**/
@Component
public class CacheUtil<T> {
private final Cache<String, T> cache;
public CacheUtil() {
this.cache = Caffeine.newBuilder()
.maximumSize(500L)
.build();
}
public T get(String key) {
return cache.getIfPresent(key);
}
public void put(String key, T value) {
cache.put(key, value);
}
public void remove(String key) {
cache.invalidate(key);
}
}

View File

@ -1,6 +1,7 @@
# Tomcat # Tomcat
server: server:
port: 10009 port: 10009
# nacos线上地址 # nacos线上地址
nacos: nacos:
addr: 47.101.53.251:8848 addr: 47.101.53.251:8848
@ -9,6 +10,7 @@ nacos:
namespace: four namespace: four
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring # Spring
spring:
amqp: amqp:
deserialization: deserialization:
trust: trust:
@ -51,9 +53,8 @@ nacos:
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# xxl-job 配置文件 # xxl-job 配置文件
- application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} - application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# rabbit 配置文件
- application-rabbit-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
logging: logging:
level: level:
com.muyu.fence.mapper: DEBUG com.muyu.system.mapper: DEBUG