feat:数据资产客户端新增数据库连接池服务类

dev
gtl 2024-05-15 19:58:04 +08:00
parent 5c13d4ba74
commit 3f8d1fd13b
1 changed files with 8 additions and 17 deletions

View File

@ -1,12 +1,10 @@
package com.ruoyi.dataTransform.consumer;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.dataAsset.config.DataAssetClientConfig;
import com.ruoyi.dataAsset.constant.QueueNameConstants;
import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.util.DataSourceUtil;
import com.ruoyi.dataAsset.service.ConnectionPoolService;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
@ -15,7 +13,6 @@ import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
@ -32,7 +29,7 @@ public class DataSourceConsumer {
private RedisTemplate<String, String> redisTemplate;
@Autowired
private DataAssetClientConfig druidDataSourceFactory;
private ConnectionPoolService connectionPoolService;
@Autowired
private MessageConverter messageConverter;
@ -48,22 +45,16 @@ public class DataSourceConsumer {
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(QueueNameConstants.DATASOURCE_MESSAGE, messageId);
if (count > 0) {
if (count != null&&count > 0) {
// 正常消费消息
DruidDataSource druidDataSource = DataSourceUtil.createDataSource(dataSource);
// 新增或替换数据源
Long id = dataSource.getId();
if(druidDataSourceFactory.getMap().containsKey(id)){
// 获取旧的
DruidDataSource source = druidDataSourceFactory.get(id);
// 替换
druidDataSourceFactory.put(id,druidDataSource);
// 如果已经存在
if(connectionPoolService.hasKey(id)){
// 关闭旧的
source.close();
}else {
//新增
druidDataSourceFactory.put(id, druidDataSource);
connectionPoolService.closeDataSource(id);
}
// 新增或替换数据源
connectionPoolService.put(dataSource);
// 消费成功 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("数据源消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(dataSource));