From 3f8d1fd13bd5a734693dfcaf83789b9a3a7294d1 Mon Sep 17 00:00:00 2001 From: gtl <2949451835@qq.com> Date: Wed, 15 May 2024 19:58:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=95=B0=E6=8D=AE=E8=B5=84=E4=BA=A7?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=96=B0=E5=A2=9E=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E8=BF=9E=E6=8E=A5=E6=B1=A0=E6=9C=8D=E5=8A=A1=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/DataSourceConsumer.java | 25 ++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java b/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java index 4a40869..2f3e0db 100644 --- a/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java +++ b/ruoyi-modules/ruoyi-data_transform/ruoyi-data_transform-server/src/main/java/com/ruoyi/dataTransform/consumer/DataSourceConsumer.java @@ -1,12 +1,10 @@ package com.ruoyi.dataTransform.consumer; -import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.fastjson2.JSON; import com.rabbitmq.client.Channel; -import com.ruoyi.dataAsset.config.DataAssetClientConfig; import com.ruoyi.dataAsset.constant.QueueNameConstants; import com.ruoyi.dataAsset.domain.DataSource; -import com.ruoyi.dataAsset.util.DataSourceUtil; +import com.ruoyi.dataAsset.service.ConnectionPoolService; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; @@ -15,7 +13,6 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; - import java.io.IOException; /** @@ -32,7 +29,7 @@ public class DataSourceConsumer { private RedisTemplate redisTemplate; @Autowired - private DataAssetClientConfig druidDataSourceFactory; + private ConnectionPoolService connectionPoolService; @Autowired private MessageConverter messageConverter; @@ -48,22 +45,16 @@ public class DataSourceConsumer { String messageId = message.getMessageProperties().getMessageId(); try { Long count = redisTemplate.opsForSet().add(QueueNameConstants.DATASOURCE_MESSAGE, messageId); - if (count > 0) { + if (count != null&&count > 0) { // 正常消费消息 - DruidDataSource druidDataSource = DataSourceUtil.createDataSource(dataSource); - // 新增或替换数据源 Long id = dataSource.getId(); - if(druidDataSourceFactory.getMap().containsKey(id)){ - // 获取旧的 - DruidDataSource source = druidDataSourceFactory.get(id); - // 替换 - druidDataSourceFactory.put(id,druidDataSource); + // 如果已经存在 + if(connectionPoolService.hasKey(id)){ // 关闭旧的 - source.close(); - }else { - //新增 - druidDataSourceFactory.put(id, druidDataSource); + connectionPoolService.closeDataSource(id); } + // 新增或替换数据源 + connectionPoolService.put(dataSource); // 消费成功 手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("数据源消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(dataSource));