diff --git a/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java b/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java index 4a40869..2f3e0db 100644 --- a/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java +++ b/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java @@ -1,12 +1,10 @@ package com.ruoyi.dataTransform.consumer; -import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.fastjson2.JSON; import com.rabbitmq.client.Channel; -import com.ruoyi.dataAsset.config.DataAssetClientConfig; import com.ruoyi.dataAsset.constant.QueueNameConstants; import com.ruoyi.dataAsset.domain.DataSource; -import com.ruoyi.dataAsset.util.DataSourceUtil; +import com.ruoyi.dataAsset.service.ConnectionPoolService; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; @@ -15,7 +13,6 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; - import java.io.IOException; /** @@ -32,7 +29,7 @@ public class DataSourceConsumer { private RedisTemplate redisTemplate; @Autowired - private DataAssetClientConfig druidDataSourceFactory; + private ConnectionPoolService connectionPoolService; @Autowired private MessageConverter messageConverter; @@ -48,22 +45,16 @@ public class DataSourceConsumer { String messageId = message.getMessageProperties().getMessageId(); try { Long count = redisTemplate.opsForSet().add(QueueNameConstants.DATASOURCE_MESSAGE, messageId); - if (count > 0) { + if (count != null&&count > 0) { // 正常消费消息 - DruidDataSource druidDataSource = DataSourceUtil.createDataSource(dataSource); - // 新增或替换数据源 Long id = dataSource.getId(); - if(druidDataSourceFactory.getMap().containsKey(id)){ - // 获取旧的 - DruidDataSource source = druidDataSourceFactory.get(id); - // 替换 - druidDataSourceFactory.put(id,druidDataSource); + // 如果已经存在 + if(connectionPoolService.hasKey(id)){ // 关闭旧的 - source.close(); - }else { - //新增 - druidDataSourceFactory.put(id, druidDataSource); + connectionPoolService.closeDataSource(id); } + // 新增或替换数据源 + connectionPoolService.put(dataSource); // 消费成功 手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("数据源消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(dataSource));