Compare commits

...

2 Commits

Author SHA1 Message Date
hbr 640b80bfb7 feat:rabbit,消息确认机制,延迟队列
# Conflicts:
#	zhiLian-business/zhiLian-data-service/src/main/resources/bootstrap.yml
#	zhiLian-vehicle/zhiLian-vehicle-service/src/main/java/com/zhiLian/vehicle/datasource/ManyDataSource.java
#	zhiLian-vehicle/zhiLian-vehicle-service/src/main/resources/bootstrap.yml
2024-06-14 20:53:02 +08:00
hbr fa1e93753e feat:测试
# Conflicts:
#	zhiLian-business/zhiLian-data-service/src/main/resources/bootstrap.yml
#	zhiLian-vehicle/zhiLian-vehicle-service/src/main/java/com/zhiLian/vehicle/datasource/ManyDataSource.java
#	zhiLian-vehicle/zhiLian-vehicle-service/src/main/resources/bootstrap.yml
2024-06-12 15:49:38 +08:00
7 changed files with 137 additions and 5 deletions

View File

@ -82,5 +82,4 @@
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,48 @@
package com.zhiLian.business.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
* @PostConstruct bean
* @PreDestory bean
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
*
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息投递到 broker 的状态true表示成功
System.out.println("消息发送成功!");
} else {
// 发送异常
System.out.println("发送异常原因 = " + cause);
// TODO 可以将消息 内容 以及 失败的原因 记录到 日志表中
}
}
}

View File

@ -0,0 +1,53 @@
package com.zhiLian.business.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdminRabbitMQJavaRabbitMQRabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitMQ
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,34 @@
package com.zhiLian.business.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* returnedMessage
*/
@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // @PostContruct是spring框架的注解在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法也可以理解为在spring容器初始化的时候执
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息" + returnedMessage.getMessage().toString() + "被交换机" + returnedMessage.getExchange() + "回退!"
+ "退回原因为:" + returnedMessage.getReplyText());
// 回退了所有的信息,可做补偿机制 记录到 数据库
}
}

View File

@ -203,6 +203,8 @@ public class BusinessServiceImpl extends ServiceImpl<BusinessMapper, Business>
entinfoService.insertEntinfo(build);
rabbitTemplate.convertAndSend("zhiLian-vehicle-exchange",JSON.toJSONString(build),message ->{
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
//设置消息延迟时间为5秒
message.getMessageProperties().setDelay(5000);
return message;
} );
}

View File

@ -25,7 +25,6 @@ import org.apache.commons.lang3.builder.ToStringStyle;
@TableName("entinfo")
public class Entinfo extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** 数据源key */

View File

@ -39,9 +39,6 @@ import java.util.Map;
public class ManyDataSource {
@Autowired
private RedisTemplate<String,String> redisTemplate;