feat()规则引擎初始化(RabbitMq)

master_fei
Yunfei Du 2024-05-17 21:26:24 +08:00
parent fda9486485
commit 61e745a009
9 changed files with 232 additions and 1 deletions

View File

@ -16,7 +16,11 @@
</description>
<dependencies>
<!-- rabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- SpringCloud Openfeign -->
<dependency>
<groupId>org.springframework.cloud</groupId>

View File

@ -20,6 +20,8 @@ import java.util.List;
@Log4j2
public class DataAccessClientRunner implements ApplicationRunner {
@Autowired
private RemoteDataSourceService remoteDataSourceService;

View File

@ -98,6 +98,12 @@
<artifactId>postgresql</artifactId>
<version>42.5.0</version> <!-- 使用当前最新稳定版本 -->
</dependency>
<dependency>
<groupId>com.etl</groupId>
<artifactId>etl-data-source-client</artifactId>
<version>3.6.3</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@ -0,0 +1,51 @@
package com.etl.data.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*
*/
@Component
@Log4j2
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
*
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息投递到broker 的状态true表示成功
log.info("消息已经成功发送到交换机");
} else {
// String exchange = correlationData.getReturned().getExchange();
// String message = correlationData.getReturned().getMessage().getBody().toString();
// 发送异常
log.error("消息发送到交换机:失败,原因是:{}", cause);
// TODO 可以把异常信息 以及 消息的内容直接添加到 MYSQL
}
}
}

View File

@ -0,0 +1,62 @@
package com.etl.data.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitAdmin RabbitAdmin Spring
*/
@Configuration
public class RabbitAdminConfig {
public static final String DATA_SOURCE_EXCHANGE = "data_source_exchange";
public static final String RULE_ENGINE_EXCHANGE = "rule_engine_exchange";
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
*
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin RabbitMQ
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
//创建交换机
FanoutExchange dataFanoutExchange = new FanoutExchange(DATA_SOURCE_EXCHANGE, true, true);
//声明交换机
rabbitAdmin.declareExchange(dataFanoutExchange);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,15 @@
package com.etl.data.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,41 @@
package com.etl.data.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
@Log4j2
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
*/
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息:{},被交换机:{} 回退!退回原因为:{}",
returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText());
// TODO 回退了所有的信息,可做补偿机制
}
}

View File

@ -0,0 +1,12 @@
package com.etl.data.service;
import com.etl.data.domain.DataSource;
/**
*
* @author YunFei.Du
* @date 9:30 2024/5/17
*/
public interface ProducerService {
void dataSourceProduct(DataSource dataSource);
}

View File

@ -0,0 +1,38 @@
package com.etl.data.service.impl;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSON;
import com.etl.data.client.connPool.service.ConnPoolManagementService;
import com.etl.data.config.RabbitAdminConfig;
import com.etl.data.domain.DataSource;
import com.etl.data.domain.dataSource.DataSourceConfig;
import com.etl.data.service.ProducerService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PathVariable;
import java.sql.SQLException;
/**
* @ClassName ProducerServiceImpl
* @Description
* @Author YunFei.Du
* @Date 2024/5/17 9:31
*/
@Service
public class ProducerServiceImpl implements ProducerService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void dataSourceProduct(DataSource dataSource) {
String msg = JSON.toJSONString ( dataSource );
rabbitTemplate.convertAndSend ( RabbitAdminConfig.DATA_SOURCE_EXCHANGE, "data_source_key", msg );
//初始化连接池
ConnPoolManagementService.createPool ( dataSource );
}
}