fix(): 事件基础修改
parent
fe90a341ff
commit
87d104cf9b
|
@ -1,7 +1,5 @@
|
|||
package com.muyu.common.kafka.config;
|
||||
|
||||
|
||||
|
||||
import com.muyu.common.core.constant.KafkaConstant;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
|
|
|
@ -49,8 +49,6 @@ public class KafkaProducerConfig {
|
|||
private String acks;
|
||||
|
||||
|
||||
|
||||
|
||||
@Bean
|
||||
public KafkaProducer kafkaProducer() {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -1,83 +0,0 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
|
||||
import com.muyu.common.rabbit.constants.RabbitmqConstants;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* @ClassName: DelayedQueueConfig
|
||||
* @Description: 延迟队列配置类
|
||||
*/
|
||||
@Configuration
|
||||
public class DelayedQueueConfig {
|
||||
|
||||
|
||||
@Resource
|
||||
private RabbitAdmin rabbitAdmin;
|
||||
|
||||
/**
|
||||
* 声明队列
|
||||
* @return 返回队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue delayedQueue() {
|
||||
Queue queue = new Queue(RabbitmqConstants.DELAYED_QUEUE_NAME);
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明交换机
|
||||
* @return 返回交换机
|
||||
*/
|
||||
@Bean
|
||||
public Exchange delayedExchange() {
|
||||
HashMap<String, Object> arguments = new HashMap<>(3);
|
||||
|
||||
arguments.put("x-delayed-type", "direct");
|
||||
|
||||
/**
|
||||
* 声明自定义交换机
|
||||
* 第一个参数:交换机的名称
|
||||
* 第二个参数:交换机的类型
|
||||
* 第三个参数:是否需要持久化
|
||||
* 第四个参数:是否自动删除
|
||||
* 第五个参数:其他参数
|
||||
*/
|
||||
CustomExchange customExchange = new CustomExchange(
|
||||
RabbitmqConstants.DELAYED_EXCHANGE_NAME,
|
||||
"x-delayed-message",
|
||||
true,
|
||||
false,
|
||||
arguments);
|
||||
rabbitAdmin.declareExchange(customExchange);
|
||||
return customExchange;
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定交换机
|
||||
* @param delayedQueue 队列对象
|
||||
* @param delayedExchange 交换机对象
|
||||
*/
|
||||
@Bean
|
||||
public Binding delayedQueueBindingDelayedExchange(
|
||||
@Qualifier("delayedQueue") Queue delayedQueue,
|
||||
@Qualifier("delayedExchange") Exchange delayedExchange) {
|
||||
|
||||
Binding noargs = BindingBuilder.bind(delayedQueue)
|
||||
.to(delayedExchange)
|
||||
.with(RabbitmqConstants.DELAYED_ROUTING_KEY)
|
||||
.noargs();
|
||||
rabbitAdmin.declareBinding(noargs);
|
||||
return noargs;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @Author: WangXin
|
||||
* @Time: 2024/4/22 11:55
|
||||
* @Description: 主题模式配置
|
||||
*/
|
||||
@Configuration
|
||||
public class TopicConfig {
|
||||
|
||||
/**
|
||||
* 主题模式交换机
|
||||
* @return exchange
|
||||
*/
|
||||
@Bean(name = "topicExchange")
|
||||
public Exchange getTopicExchange(){
|
||||
return ExchangeBuilder
|
||||
.topicExchange("exchange_topic")
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 主题队列 01
|
||||
* @return queue
|
||||
*/
|
||||
@Bean(name = "topicQueue01")
|
||||
public Queue getTopicQueue01(){
|
||||
return QueueBuilder
|
||||
.durable("queue_topic_01")
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 主题队列 02
|
||||
* @return queue
|
||||
*/
|
||||
@Bean(name = "topicQueue02")
|
||||
public Queue getTopicQueue02(){
|
||||
return QueueBuilder
|
||||
.durable("queue_topic_02")
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定队列 01
|
||||
* @return binding
|
||||
*/
|
||||
@Bean
|
||||
public Binding getTopicBinding01(){
|
||||
return BindingBuilder
|
||||
.bind(getTopicQueue01())
|
||||
.to(getTopicExchange())
|
||||
//路由键 队列1接收debug级别的消息
|
||||
.with("front.#")
|
||||
.noargs();
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定队列 02
|
||||
* @return binding
|
||||
*/
|
||||
@Bean
|
||||
public Binding getTopicBinding02(){
|
||||
return BindingBuilder
|
||||
.bind(getTopicQueue02())
|
||||
.to(getTopicExchange())
|
||||
// 路由键 队列2接收info级别的消息
|
||||
.with("back.order.*")
|
||||
.noargs();
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
package com.muyu.rabbitmq.config;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
package com.muyu.rabbitmq.config;
|
||||
|
||||
|
||||
|
||||
|
@ -21,16 +21,16 @@ public class RabbitAdminConfig {
|
|||
private String username;
|
||||
@Value("${spring.rabbitmq.password}")
|
||||
private String password;
|
||||
@Value("${spring.rabbitmq.virtualhost}")
|
||||
private String virtualHost;
|
||||
@Value("${spring.rabbitmq.port}")
|
||||
private Integer port;
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory connectionFactory() {
|
||||
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
|
||||
cachingConnectionFactory.setHost(host);
|
||||
cachingConnectionFactory.setPort(port);
|
||||
cachingConnectionFactory.setUsername(username);
|
||||
cachingConnectionFactory.setPassword(password);
|
||||
cachingConnectionFactory.setVirtualHost(virtualHost);
|
||||
return cachingConnectionFactory;
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
package com.muyu.rabbitmq.config;
|
||||
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
package com.muyu.rabbitmq.config;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.springframework.amqp.core.ReturnedMessage;
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.common.rabbit.constants;
|
||||
package com.muyu.rabbitmq.constants;
|
||||
|
||||
/**
|
||||
*
|
|
@ -1,11 +1,15 @@
|
|||
package com.muyu.common.rabbit.consumer;
|
||||
package com.muyu.rabbitmq.consumer;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.common.redis.service.RedisService;
|
||||
import com.muyu.rabbitmq.util.CacheUtil;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -22,6 +26,9 @@ public class RabbitMQConsumerUtil {
|
|||
|
||||
private final RedisService redisService;
|
||||
|
||||
@Autowired
|
||||
private CacheUtil cacheUtil;
|
||||
|
||||
|
||||
/**
|
||||
* 普通消费者
|
||||
|
@ -29,7 +36,8 @@ public class RabbitMQConsumerUtil {
|
|||
* @param message
|
||||
* @param channel
|
||||
*/
|
||||
public void rabbitMQBasicConsumer(Object data ,Message message , Channel channel) {
|
||||
@RabbitListener(queuesToDeclare = @Queue(name = "basic"))
|
||||
public void rabbitMQBasicConsumer(String data ,Message message , Channel channel) {
|
||||
log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message);
|
||||
try {
|
||||
// 获取到消息 开始消费
|
||||
|
@ -42,10 +50,12 @@ public class RabbitMQConsumerUtil {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* -----------------------------------以下为异步业务操作----------------------------
|
||||
*/
|
||||
String carList = (String) redisService.redisTemplate.opsForValue().get("carList");
|
||||
cacheUtil.put("carList",carList);
|
||||
|
||||
|
||||
/**
|
||||
* ------------------------------------------------------------------------------
|
|
@ -1,7 +1,7 @@
|
|||
package com.muyu.common.rabbit.producer;
|
||||
package com.muyu.rabbitmq.producer;
|
||||
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.common.rabbit.constants.RabbitmqConstants;
|
||||
import com.muyu.rabbitmq.constants.RabbitmqConstants;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
|
@ -31,9 +31,9 @@ public class RabbitMQProducerUtil {
|
|||
* @return 结果集
|
||||
* 一对一消费,只有一个消费者能接收到
|
||||
*/
|
||||
public Result<?> basicSendMessage(String queueName, Object param, String msg) {
|
||||
public void basicSendMessage(String queueName, String param) {
|
||||
|
||||
log.info("【简单模型mq】 : method: 【 basicSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", RabbitmqConstants.BASIC_QUEUE_NAME, param, msg);
|
||||
log.info("【简单模型mq】 : method: 【 basicSendMessage 】 - ages: 【 String : {}, Object : {}】 ---> 【 消息发送中。。。 】", RabbitmqConstants.BASIC_QUEUE_NAME, param);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
|
@ -44,7 +44,7 @@ public class RabbitMQProducerUtil {
|
|||
|
||||
log.info("【简单模型mq】 : method: 【 basicSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", RabbitmqConstants.BASIC_QUEUE_NAME);
|
||||
|
||||
return Result.success(msg!=null?msg:"消息发送成功");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -156,19 +156,19 @@ public class RabbitMQProducerUtil {
|
|||
* @param delayTime 延迟时间
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> delayedSendMessage(Long delayTime, Object param) {
|
||||
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送中。。。 】",param);
|
||||
|
||||
rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYED_EXCHANGE_NAME, RabbitmqConstants.DELAYED_ROUTING_KEY,param, message -> {
|
||||
MessageProperties messageProperties = message.getMessageProperties();
|
||||
messageProperties.setMessageId(UUID.randomUUID().toString());
|
||||
messageProperties.setDelayLong(delayTime);
|
||||
return message;
|
||||
});
|
||||
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送成功 】",param);
|
||||
|
||||
return Result.success(param,"消息发送成功");
|
||||
|
||||
}
|
||||
// public Result<?> delayedSendMessage(Long delayTime, Object param) {
|
||||
// log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送中。。。 】",param);
|
||||
//
|
||||
// rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYED_EXCHANGE_NAME, RabbitmqConstants.DELAYED_ROUTING_KEY,param, message -> {
|
||||
// MessageProperties messageProperties = message.getMessageProperties();
|
||||
// messageProperties.setMessageId(UUID.randomUUID().toString());
|
||||
// messageProperties.setDelayLong(delayTime);
|
||||
// return message;
|
||||
// });
|
||||
// log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送成功 】",param);
|
||||
//
|
||||
// return Result.success(param,"消息发送成功");
|
||||
//
|
||||
// }
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.muyu.rabbitmq.util;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 缓存工具类
|
||||
*
|
||||
* @program: cloud-server
|
||||
* @author: 刘武
|
||||
* @create: 2024-09-30 10:08
|
||||
**/
|
||||
@Component
|
||||
public class CacheUtil<T> {
|
||||
|
||||
private final Cache<String, T> cache;
|
||||
|
||||
public CacheUtil() {
|
||||
this.cache = Caffeine.newBuilder()
|
||||
.maximumSize(500L)
|
||||
.build();
|
||||
}
|
||||
|
||||
public T get(String key) {
|
||||
return cache.getIfPresent(key);
|
||||
}
|
||||
|
||||
public void put(String key, T value) {
|
||||
cache.put(key, value);
|
||||
}
|
||||
|
||||
public void remove(String key) {
|
||||
cache.invalidate(key);
|
||||
}
|
||||
|
||||
}
|
|
@ -2,6 +2,5 @@ com.muyu.rabbitmq.producer.RabbitMQProducerUtil
|
|||
com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil
|
||||
com.muyu.rabbitmq.config.RabbitmqConfig
|
||||
com.muyu.rabbitmq.config.MyConfirmCallback
|
||||
com.muyu.rabbitmq.config.DelayedQueueConfig
|
||||
com.muyu.rabbitmq.config.RabbitAdminConfig
|
||||
com.muyu.rabbitmq.config.ReturnCallbackConfig
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
<module>cloud-common-saas</module>
|
||||
<module>cloud-common-swagger</module>
|
||||
<module>cloud-common-cache</module>
|
||||
<module>cloud-common-kafka</module>
|
||||
</modules>
|
||||
|
||||
<artifactId>cloud-common</artifactId>
|
||||
|
|
|
@ -66,9 +66,7 @@ public class IoTDBConfig {
|
|||
measurements.add("car_vin");
|
||||
measurements.add("information");
|
||||
|
||||
|
||||
session.insertRecord(TABLENAME,System.currentTimeMillis(),measurements,list);
|
||||
|
||||
//关闭连接
|
||||
session.close();
|
||||
} catch (IoTDBConnectionException e) {
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package com.muyu.event.consumer;
|
||||
|
||||
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.event.basic.EventPublisher;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
@ -36,7 +34,6 @@ public class MessageConsumer implements ApplicationRunner {
|
|||
|
||||
private final String topic="four_car";
|
||||
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
List<String> list = Collections.singletonList(topic);
|
||||
|
@ -48,10 +45,8 @@ public class MessageConsumer implements ApplicationRunner {
|
|||
String value = record.value();
|
||||
JSONObject jsonObject = JSONObject.parseObject(value);
|
||||
log.info("value:{}",value);
|
||||
// eventPublisher.publishEvent(jsonObject);
|
||||
eventPublisher.publishEvent(jsonObject);
|
||||
});
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package com.muyu.event.consumer;
|
||||
|
||||
import com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* rabbitmq 监听器
|
||||
* @author 刘武
|
||||
* @package:com.muyu.event.consumer
|
||||
* @name:MqConsumer
|
||||
* @date:2024/10/2 14:17
|
||||
*/
|
||||
|
||||
@Component
|
||||
public class MqConsumer {
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -18,4 +18,6 @@ public class OnlineConsumer {
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
|||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* 数据处理
|
||||
* @author 刘武
|
||||
* @package:com.muyu.event.controller
|
||||
* @name:DataController
|
||||
|
|
|
@ -19,13 +19,12 @@ import java.util.List;
|
|||
* @name:ItodbController
|
||||
* @date:2024/9/28 19:17
|
||||
*/
|
||||
@RestController()
|
||||
@RestController
|
||||
public class IoTDBController {
|
||||
|
||||
@Autowired
|
||||
private IoTDBService tdbService;
|
||||
|
||||
|
||||
/**
|
||||
* 查询实时车辆信息列表
|
||||
* @return list
|
||||
|
@ -47,7 +46,6 @@ public class IoTDBController {
|
|||
return Result.success(carInformation);
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* 车辆添加
|
||||
* @param addCarInformation
|
||||
|
@ -71,4 +69,6 @@ public class IoTDBController {
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -2,20 +2,17 @@ package com.muyu.event.controller;
|
|||
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.event.service.TestService;
|
||||
import com.muyu.rabbitmq.producer.RabbitMQProducerUtil;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.amqp.core.AmqpTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
|
||||
/**
|
||||
* 测试
|
||||
* @author 刘武
|
||||
* @package:com.muyu.event.controller
|
||||
* @name:TestController
|
||||
|
@ -26,25 +23,33 @@ public class TestController {
|
|||
|
||||
@Resource
|
||||
private KafkaProducer kafkaProducer;
|
||||
|
||||
@Resource
|
||||
private RabbitMQProducerUtil rabbitMQProducerUtil;
|
||||
|
||||
|
||||
private static final String topic="four_car";
|
||||
|
||||
@GetMapping("send")
|
||||
@GetMapping("sendKafka")
|
||||
public String sendKafka(){
|
||||
|
||||
String message="发送一条信息";
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("cj","sb");
|
||||
|
||||
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,jsonObject.toJSONString());
|
||||
jsonObject.put("cj","hh");
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,jsonObject.toString());
|
||||
kafkaProducer.send(producerRecord);
|
||||
|
||||
return "success";
|
||||
}
|
||||
|
||||
@GetMapping("sendMq")
|
||||
public String sendMq(){
|
||||
String message="发送一条信息-mq";
|
||||
rabbitMQProducerUtil.basicSendMessage("basic",message);
|
||||
return "success-mq";
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -13,7 +13,13 @@ import org.springframework.format.annotation.DateTimeFormat;
|
|||
|
||||
import java.util.Date;
|
||||
|
||||
|
||||
/**
|
||||
* 事件实体类
|
||||
* @author 刘武
|
||||
* @package:com.muyu.event.domain
|
||||
* @name:Event
|
||||
* @date:2024/9/28 23:10
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
|
|
Loading…
Reference in New Issue