Merge remote-tracking branch 'origin/dev.event' into dev

# Conflicts:
#	cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/producer/RabbitMQProducerUtil.java
#	cloud-common/cloud-common-rabbit/src/main/java/com/muyu/rabbitmq/consumer/RabbitMQConsumerUtil.java
#	cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/TemplateController.java
dev.vehiclegateway
刘武 2024-10-07 21:31:48 +08:00
commit c90845d508
26 changed files with 351 additions and 397 deletions

View File

@ -1,7 +1,5 @@
package com.muyu.common.kafka.config; package com.muyu.common.kafka.config;
import com.muyu.common.core.constant.KafkaConstant; import com.muyu.common.core.constant.KafkaConstant;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;

View File

@ -49,8 +49,6 @@ public class KafkaProducerConfig {
private String acks; private String acks;
@Bean @Bean
public KafkaProducer kafkaProducer() { public KafkaProducer kafkaProducer() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();

View File

@ -34,6 +34,8 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-redis</artifactId> <artifactId>cloud-common-redis</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,83 +0,0 @@
package com.muyu.common.rabbit.config;
import com.muyu.common.rabbit.constants.RabbitmqConstants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
/**
* @ClassName: DelayedQueueConfig
* @Description:
*/
@Configuration
public class DelayedQueueConfig {
@Resource
private RabbitAdmin rabbitAdmin;
/**
*
* @return
*/
@Bean
public Queue delayedQueue() {
Queue queue = new Queue(RabbitmqConstants.DELAYED_QUEUE_NAME);
rabbitAdmin.declareQueue(queue);
return queue;
}
/**
*
* @return
*/
@Bean
public Exchange delayedExchange() {
HashMap<String, Object> arguments = new HashMap<>(3);
arguments.put("x-delayed-type", "direct");
/**
*
*
*
*
*
*
*/
CustomExchange customExchange = new CustomExchange(
RabbitmqConstants.DELAYED_EXCHANGE_NAME,
"x-delayed-message",
true,
false,
arguments);
rabbitAdmin.declareExchange(customExchange);
return customExchange;
}
/**
*
* @param delayedQueue
* @param delayedExchange
*/
@Bean
public Binding delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") Exchange delayedExchange) {
Binding noargs = BindingBuilder.bind(delayedQueue)
.to(delayedExchange)
.with(RabbitmqConstants.DELAYED_ROUTING_KEY)
.noargs();
rabbitAdmin.declareBinding(noargs);
return noargs;
}
}

View File

@ -1,75 +0,0 @@
package com.muyu.common.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: WangXin
* @Time: 2024/4/22 11:55
* @Description:
*/
@Configuration
public class TopicConfig {
/**
*
* @return exchange
*/
@Bean(name = "topicExchange")
public Exchange getTopicExchange(){
return ExchangeBuilder
.topicExchange("exchange_topic")
.build();
}
/**
* 01
* @return queue
*/
@Bean(name = "topicQueue01")
public Queue getTopicQueue01(){
return QueueBuilder
.durable("queue_topic_01")
.build();
}
/**
* 02
* @return queue
*/
@Bean(name = "topicQueue02")
public Queue getTopicQueue02(){
return QueueBuilder
.durable("queue_topic_02")
.build();
}
/**
* 01
* @return binding
*/
@Bean
public Binding getTopicBinding01(){
return BindingBuilder
.bind(getTopicQueue01())
.to(getTopicExchange())
//路由键 队列1接收debug级别的消息
.with("front.#")
.noargs();
}
/**
* 02
* @return binding
*/
@Bean
public Binding getTopicBinding02(){
return BindingBuilder
.bind(getTopicQueue02())
.to(getTopicExchange())
// 路由键 队列2接收info级别的消息
.with("back.order.*")
.noargs();
}
}

View File

@ -1,172 +0,0 @@
package com.muyu.common.rabbit.producer;
import com.muyu.common.core.domain.Result;
import com.muyu.common.rabbit.constants.RabbitmqConstants;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @ClassName: RabbitMQProducer
* @Description: rabbitmq
*/
@Component
@AllArgsConstructor
@Log4j2
public class RabbitMQProducerUtil {
//redis工具类对象
//rabbit
private final RabbitTemplate rabbitTemplate;
/**
*
*
* @param param
* @return
*
*/
public Result<?> basicSendMessage(String queueName, Object param, String msg) {
log.info("【简单模型mq】 : method: 【 basicSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", RabbitmqConstants.BASIC_QUEUE_NAME, param, msg);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(queueName, param, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【简单模型mq】 : method: 【 basicSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", RabbitmqConstants.BASIC_QUEUE_NAME);
return Result.success(msg!=null?msg:"消息发送成功");
}
/**
* Work queue
*
* @param obj
* @return
*
*/
public Result<?> workSendMessage(String queueName, Object obj, String msg) {
log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(queueName, obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
return Result.success("消息发送成功");
}
/**
* Publish/Subscribe
* fanout
* @param exchange
* @param obj Object
* @param msg
* @return
*/
public Result<?> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
// 发送简单模型消息
// 第一个参数: exchange 交换机的名称
// 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
// 第三个参数:消息内容
rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
return Result.success("消息发送成功");
}
/**
* Routing
* 使 Direct Queue()
*
* @param exchange
* @param rule
* @param obj Object
* @param msg
* @return
*/
public Result<?> routingSendMessage(String exchange, String rule, Object obj, String msg) {
log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
return Result.success("消息发送成功");
}
/**
* Topic
* 使 topic
*
* @param exchange
* @param rule . 使 #( ) *( ) name.msg, *.msg, age.#
* @param obj Object
* @param msg
* @return
*/
public Result<?> topicSendMessage(String exchange, String rule, Object obj) {
log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
return Result.success(obj,"消息发送成功");
}
/**
*
* @param param
* @param delayTime
* @return
*/
public Result<?> delayedSendMessage(Long delayTime, Object param) {
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送中。。。 】",param);
rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYED_EXCHANGE_NAME, RabbitmqConstants.DELAYED_ROUTING_KEY,param, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setDelayLong(delayTime);
return message;
});
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送成功 】",param);
return Result.success(param,"消息发送成功");
}
}

View File

@ -1,4 +1,4 @@
package com.muyu.common.rabbit.config; package com.muyu.rabbitmq.config;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.connection.CorrelationData;

View File

@ -1,4 +1,4 @@
package com.muyu.common.rabbit.config; package com.muyu.rabbitmq.config;
@ -21,16 +21,16 @@ public class RabbitAdminConfig {
private String username; private String username;
@Value("${spring.rabbitmq.password}") @Value("${spring.rabbitmq.password}")
private String password; private String password;
@Value("${spring.rabbitmq.virtualhost}") @Value("${spring.rabbitmq.port}")
private String virtualHost; private Integer port;
@Bean @Bean
public ConnectionFactory connectionFactory() { public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host); cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username); cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password); cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setVirtualHost(virtualHost);
return cachingConnectionFactory; return cachingConnectionFactory;
} }

View File

@ -1,4 +1,4 @@
package com.muyu.common.rabbit.config; package com.muyu.rabbitmq.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.MessageConverter;

View File

@ -1,4 +1,4 @@
package com.muyu.common.rabbit.config; package com.muyu.rabbitmq.config;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.core.ReturnedMessage;

View File

@ -1,11 +1,15 @@
package com.muyu.common.rabbit.consumer; package com.muyu.rabbitmq.consumer;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.redis.service.RedisService; import com.muyu.common.redis.service.RedisService;
//import com.muyu.rabbitmq.util.CacheUtil;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
@ -22,6 +26,9 @@ public class RabbitMQConsumerUtil {
private final RedisService redisService; private final RedisService redisService;
// @Autowired
// private CacheUtil cacheUtil;
/** /**
* *
@ -29,17 +36,30 @@ public class RabbitMQConsumerUtil {
* @param message * @param message
* @param channel * @param channel
*/ */
public void rabbitMQBasicConsumer(Object data ,Message message , Channel channel) { @RabbitListener(queuesToDeclare = @Queue(name = "basic"))
public void rabbitMQBasicConsumer(String data ,Message message , Channel channel) {
log.info("当前时间:{} RabbitMQConsumerUtil : {}", new Date(), message); log.info("当前时间:{} RabbitMQConsumerUtil : {}", new Date(), message);
try { try {
// 获取到消息 开始消费 // 获取到消息 开始消费
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data)); log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId()); Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
if (add != 1) { if (add != 1) {
return; return;
} }
/**
* ---------------------------------------------------------------
*/
String carList = (String) redisService.redisTemplate.opsForValue().get("carList");
/**
* ------------------------------------------------------------------------------
*/
// 消费消息成功之后需要确认 // 消费消息成功之后需要确认
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息 // long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认 // boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
@ -66,6 +86,7 @@ public class RabbitMQConsumerUtil {
} }
} }
/** /**
* *
* @param data * @param data
@ -85,6 +106,7 @@ public class RabbitMQConsumerUtil {
return; return;
} }
/** /**
* --------------------------------------------------------------- * ---------------------------------------------------------------
*/ */

View File

@ -0,0 +1,174 @@
package com.muyu.rabbitmq.producer;
import com.muyu.common.core.domain.Result;
import com.muyu.rabbitmq.constants.RabbitmqConstants;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @ClassName: RabbitMQProducer
* @Description: rabbitmq
*/
@Component
@AllArgsConstructor
@Log4j2
public class RabbitMQProducerUtil {
//redis工具类对象
//rabbit
private final RabbitTemplate rabbitTemplate;
/**
*
*
* @param param
* @return
*
*/
public void basicSendMessage(String queueName, String param) {
log.info("【简单模型mq】 : method: 【 basicSendMessage 】 - ages: 【 String : {}, Object : {}】 ---> 【 消息发送中。。。 】", RabbitmqConstants.BASIC_QUEUE_NAME, param);
// 发送简单模型消息
// 第一个参数: 绑定规则 相当于 队列名称
// 第二个参数:消息内容
rabbitTemplate.convertAndSend(queueName, param, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
} );
log.info("【简单模型mq】 : method: 【 basicSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", RabbitmqConstants.BASIC_QUEUE_NAME);
}
// /**
// * Work queue 工作模型
// *
// * @param obj 传递的消息 (如果是对象需要序列化)
// * @return 结果集
// * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点
// */
// public Result<?> workSendMessage(String queueName, Object obj, String msg) {
//
// log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
// // 发送简单模型消息
// // 第一个参数: 绑定规则 相当于 队列名称
// // 第二个参数:消息内容
// rabbitTemplate.convertAndSend(queueName, obj, message -> {
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
// return message;
// } );
//
// log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
//
// return Result.success("消息发送成功");
// }
//
// /**
// * Publish/Subscribe 发布订阅者模型
// * 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
// *
// * @param exchange 交换机名称
// * @param obj 发送的消息Object
// * @param msg 响应的内容
// * @return 结果集
// */
// public Result<?> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
//
// log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
// // 发送简单模型消息
// // 第一个参数: exchange 交换机的名称
// // 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
// // 第三个参数:消息内容
// rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
// return message;
// } );
//
// log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
//
// return Result.success("消息发送成功");
// }
//
// /**
// * Routing路由模型
// * 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式
// *
// * @param exchange 交换机名称
// * @param rule 绑定规则 一个字符串即可
// * @param obj 发送的消息Object
// * @param msg 响应的内容
// * @return 结果集
// */
// public Result<?> routingSendMessage(String exchange, String rule, Object obj, String msg) {
//
// log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
// // 发送简单模型消息
// // 第一个参数: 绑定规则 相当于 队列名称
// // 第二个参数:消息内容
// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
// return message;
// } );
//
// log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
//
// return Result.success("消息发送成功");
// }
//
//
// /**
// * Topic主题模型模型
// * 使用的是 topic 类型的交换机
// *
// * @param exchange 交换机名称
// * @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符例如name.msg, *.msg, age.#
// * @param obj 发送的消息Object
// * @param msg 响应的内容
// * @return 结果集
// */
// public Result<?> topicSendMessage(String exchange, String rule, Object obj) {
//
// log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {} 】 ---> 【 消息发送中。。。 】", exchange, obj);
// // 发送简单模型消息
// // 第一个参数: 绑定规则 相当于 队列名称
// // 第二个参数:消息内容
// rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
// message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
// return message;
// } );
//
// log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
//
// return Result.success(obj,"消息发送成功");
// }
/**
*
* @param param
* @param delayTime
* @return
*/
// public Result<?> delayedSendMessage(Long delayTime, Object param) {
// log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送中。。。 】",param);
//
// rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYED_EXCHANGE_NAME, RabbitmqConstants.DELAYED_ROUTING_KEY,param, message -> {
// MessageProperties messageProperties = message.getMessageProperties();
// messageProperties.setMessageId(UUID.randomUUID().toString());
// messageProperties.setDelayLong(delayTime);
// return message;
// });
// log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送成功 】",param);
//
// return Result.success(param,"消息发送成功");
//
// }
}

View File

@ -2,6 +2,5 @@ com.muyu.rabbitmq.producer.RabbitMQProducerUtil
com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil
com.muyu.rabbitmq.config.RabbitmqConfig com.muyu.rabbitmq.config.RabbitmqConfig
com.muyu.rabbitmq.config.MyConfirmCallback com.muyu.rabbitmq.config.MyConfirmCallback
com.muyu.rabbitmq.config.DelayedQueueConfig
com.muyu.rabbitmq.config.RabbitAdminConfig com.muyu.rabbitmq.config.RabbitAdminConfig
com.muyu.rabbitmq.config.ReturnCallbackConfig com.muyu.rabbitmq.config.ReturnCallbackConfig

View File

@ -115,6 +115,12 @@
<artifactId>cloud-common-rabbit</artifactId> <artifactId>cloud-common-rabbit</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>saas-cache</artifactId>
<version>3.6.3</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -28,6 +28,4 @@ public class EventPublisher implements ApplicationEventPublisherAware {
publisher.publishEvent(event); publisher.publishEvent(event);
} }
} }

View File

@ -66,9 +66,7 @@ public class IoTDBConfig {
measurements.add("car_vin"); measurements.add("car_vin");
measurements.add("information"); measurements.add("information");
session.insertRecord(TABLENAME,System.currentTimeMillis(),measurements,list); session.insertRecord(TABLENAME,System.currentTimeMillis(),measurements,list);
//关闭连接 //关闭连接
session.close(); session.close();
} catch (IoTDBConnectionException e) { } catch (IoTDBConnectionException e) {

View File

@ -1,7 +1,5 @@
package com.muyu.event.consumer; package com.muyu.event.consumer;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.basic.EventPublisher; import com.muyu.event.basic.EventPublisher;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -36,7 +34,6 @@ public class MessageConsumer implements ApplicationRunner {
private final String topic="four_car"; private final String topic="four_car";
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
List<String> list = Collections.singletonList(topic); List<String> list = Collections.singletonList(topic);
@ -48,10 +45,8 @@ public class MessageConsumer implements ApplicationRunner {
String value = record.value(); String value = record.value();
JSONObject jsonObject = JSONObject.parseObject(value); JSONObject jsonObject = JSONObject.parseObject(value);
log.info("value:{}",value); log.info("value:{}",value);
// eventPublisher.publishEvent(jsonObject); eventPublisher.publishEvent(jsonObject);
}); });
} }
} }
} }

View File

@ -0,0 +1,92 @@
package com.muyu.event.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.cache.ElectronicFenceGroupCacheService;
import com.muyu.cache.SysCarCacheService;
import com.muyu.common.domain.database.ElectronicFenceGroup;
import com.muyu.common.domain.resp.SysCarVo;
import com.muyu.common.redis.service.RedisService;
import com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import java.util.List;
/**
* rabbitmq
* @author
* @packagecom.muyu.event.consumer
* @nameMqConsumer
* @date2024/10/2 14:17
*/
@Component
@Log4j2
public class MqConsumer {
@Autowired
private RedisService redisService;
@Autowired
private SysCarCacheService sysCarCacheService;
@Autowired
private ElectronicFenceGroupCacheService electronicFenceGroupCacheService;
@RabbitListener(queuesToDeclare = @Queue(name = "basic"))
public void rabbitMQBasicConsumer(String data , Message message , Channel channel) {
log.info("当前时间:{} RabbitMQConsumerUtil : {}", new Date(), message);
try {
// 获取到消息 开始消费
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
if (add != 1) {
return;
}
/**
* ---------------------------------------------------------------
*/
List<SysCarVo> carList = sysCarCacheService.get("carList");
ElectronicFenceGroup fenceGroupList = electronicFenceGroupCacheService.get("electronicFenceGroupList");
/**
* ------------------------------------------------------------------------------
*/
// 消费消息成功之后需要确认
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("xxx消费者接收到消息消息内容{},消费成功...", message);
} catch (Exception e) {
log.error("xxx消费者接收到消息消息内容{},消费消息异常,异常信息:{}", message, e);
// 消息回退 拒绝消费消息
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
// boolean requeue 是否回到原来的队列
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ex) {
log.error("xxx消费者接收到消息消息内容{},回退消息异常,异常信息:{}", message, ex);
}
}finally {
try {
channel.close();
} catch (Exception e) {
log.error("xxx消费者关闭Channel异常消息内容{},异常信息:{}", message, e);
}
}
}
}

View File

@ -18,4 +18,6 @@ public class OnlineConsumer {
} }

View File

@ -4,11 +4,13 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
/** /**
*
* @author * @author
* @packagecom.muyu.event.controller * @packagecom.muyu.event.controller
* @nameDataController * @nameDataController
* @date2024/9/29 20:16 * @date2024/9/29 20:16
*/ */
@RestController @RestController
@RequestMapping("data") @RequestMapping("data")
public class DataController { public class DataController {

View File

@ -19,13 +19,12 @@ import java.util.List;
* @nameItodbController * @nameItodbController
* @date2024/9/28 19:17 * @date2024/9/28 19:17
*/ */
@RestController() @RestController
public class IoTDBController { public class IoTDBController {
@Autowired @Autowired
private IoTDBService tdbService; private IoTDBService tdbService;
/** /**
* *
* @return list * @return list
@ -47,7 +46,6 @@ public class IoTDBController {
return Result.success(carInformation); return Result.success(carInformation);
}; };
/** /**
* *
* @param addCarInformation * @param addCarInformation
@ -59,16 +57,4 @@ public class IoTDBController {
return Result.success("添加成功"); return Result.success("添加成功");
}; };
} }

View File

@ -2,20 +2,17 @@ package com.muyu.event.controller;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.service.TestService; import com.muyu.rabbitmq.producer.RabbitMQProducerUtil;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
/** /**
*
* @author * @author
* @packagecom.muyu.event.controller * @packagecom.muyu.event.controller
* @nameTestController * @nameTestController
@ -26,25 +23,33 @@ public class TestController {
@Resource @Resource
private KafkaProducer kafkaProducer; private KafkaProducer kafkaProducer;
@Resource
private RabbitMQProducerUtil rabbitMQProducerUtil;
private static final String topic="four_car"; private static final String topic="four_car";
@GetMapping("send") @GetMapping("sendKafka")
public String sendKafka(){ public String sendKafka(){
String message="发送一条信息"; String message="发送一条信息";
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("cj","sb"); jsonObject.put("cj","hh");
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,jsonObject.toString());
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,jsonObject.toJSONString());
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
return "success"; return "success";
} }
@GetMapping("sendMq")
public String sendMq(){
String message="发送一条信息-mq";
rabbitMQProducerUtil.basicSendMessage("basic",message);
return "success-mq";
};

View File

@ -13,7 +13,13 @@ import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date; import java.util.Date;
/**
*
* @author
* @packagecom.muyu.event.domain
* @nameEvent
* @date2024/9/28 23:10
*/
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor

View File

@ -28,6 +28,7 @@ public class AddDatabaseListener implements EventListener {
keys.add(key); keys.add(key);
values.add((String) value); values.add((String) value);
}); });
} }
@Override @Override

View File

@ -1,6 +1,5 @@
package com.muyu.server.controller; package com.muyu.server.controller;
import com.muyu.cache.TemplateCacheService;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.Template; import com.muyu.common.domain.Template;
import com.muyu.server.service.TemplateService; import com.muyu.server.service.TemplateService;
@ -18,8 +17,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
/** /**
* * @Authorliuxinyue
* @author liuxinyue
* @Packagecom.template.controller * @Packagecom.template.controller
* @Projectcloud-server-c * @Projectcloud-server-c
* @nameTemplateController * @nameTemplateController
@ -35,9 +33,6 @@ public class TemplateController {
@Autowired @Autowired
private TemplateService templateService; private TemplateService templateService;
@Autowired
private TemplateCacheService templateCacheService;
/** /**
* *
* @return * @return
@ -45,19 +40,24 @@ public class TemplateController {
@PostMapping("/templateList") @PostMapping("/templateList")
@Operation(summary = "报文模版列表",description = "报文模版列表") @Operation(summary = "报文模版列表",description = "报文模版列表")
public Result<List<Template>> templateList() { public Result<List<Template>> templateList() {
return Result.success(templateService.list());
List<Template> list = templateService.list();
templateCacheService.put("List",list);
return Result.success(list);
} }
/**
*
* @param templateMessage
* @return
*/
@PostMapping("/messageParsing")
@Operation(summary = "报文解析",description = "报文解析")
public Result messageParsing(@RequestParam("templateMessage") String templateMessage) throws SQLException, IoTDBConnectionException, ClassNotFoundException, StatementExecutionException, ExecutionException, InterruptedException {
templateService.messageParsing(templateMessage);
return Result.success();
}
/** /**
* * 0002222220
* @param template * @param template
* @return * @return
*/ */