From 34698754690fe0183f631d6785d490c27d1a6a0a Mon Sep 17 00:00:00 2001
From: Saisai Liu <1374434128@qq.com>
Date: Thu, 16 May 2024 21:36:16 +0800
Subject: [PATCH] =?UTF-8?q?bug:=E6=97=A0=E6=B3=95=E6=89=AB=E6=8F=8FredisSe?=
=?UTF-8?q?rvice?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
muyu-common/muyu-common-cache/pom.xml | 3 --
.../java/com/muyu/cache/abs/CacheAbs.java | 15 ++++--
.../common/redis/service/RedisService.java | 2 +
muyu-modules/muyu-etl/muyu-etl-cache/pom.xml | 7 +++
.../java/com/muyu/etl/AssetClientRunner.java | 22 ++-------
.../java/com/muyu/etl/MainApplication.java | 26 ----------
.../etl/util/factory/DruidUtilsFactory.java | 48 ++++++++++++-------
.../util/service/ConnectionPoolFactory.java | 7 +--
.../muyu-etl/muyu-etl-service/pom.xml | 3 +-
.../java/com/muyu/EtlApplicationRunner.java | 16 ++++++-
.../main/java/com/muyu/UnitApplication.java | 2 +
.../java/com/muyu/custom/AddInitConn.java | 3 +-
.../muyu/service/impl/UnitServiceImpl.java | 44 +++++++++++++----
pom.xml | 6 +++
14 files changed, 117 insertions(+), 87 deletions(-)
delete mode 100644 muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/MainApplication.java
diff --git a/muyu-common/muyu-common-cache/pom.xml b/muyu-common/muyu-common-cache/pom.xml
index 0f84556..be45d29 100644
--- a/muyu-common/muyu-common-cache/pom.xml
+++ b/muyu-common/muyu-common-cache/pom.xml
@@ -16,10 +16,7 @@
17
UTF-8
-
-
-
com.muyu
muyu-common-redis
diff --git a/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java b/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java
index bcab592..1e8cd12 100644
--- a/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java
+++ b/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java
@@ -2,6 +2,9 @@ package com.muyu.cache.abs;
import com.muyu.cache.redis.Cache;
import com.muyu.common.redis.service.RedisService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
import java.util.List;
@@ -12,16 +15,20 @@ import java.util.List;
* @Date 2024/5/14 17:05
*/
+@Component
public abstract class CacheAbs> implements Cache {
-
+ @Autowired
private RedisService redisService;
- public RedisService getRedisService() {
- return redisService;
- }
+// public void setRedisService(RedisService redisService) {
+// if (redisService==null)new RedisService();
+// this.redisService = redisService;
+// }
+ @Autowired
public CacheAbs(RedisService redisService) {
+// if (redisService==null) redisService = new RedisService();
this.redisService = redisService;
}
diff --git a/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java b/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java
index 24e66b1..b00fb91 100644
--- a/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java
+++ b/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java
@@ -2,6 +2,7 @@ package com.muyu.common.redis.service;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
@@ -18,6 +19,7 @@ import java.util.concurrent.TimeUnit;
**/
@SuppressWarnings(value = {"unchecked", "rawtypes"})
@Component
+@Lazy
public class RedisService {
@Autowired
public RedisTemplate redisTemplate;
diff --git a/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml b/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml
index aad063a..ad6e583 100644
--- a/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml
+++ b/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml
@@ -17,14 +17,21 @@
UTF-8
+
com.muyu
muyu-etl-common
+
com.muyu
muyu-common-cache
+
+
+ com.muyu
+ muyu-common-redis
+
diff --git a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java b/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java
index c801650..ceeefb3 100644
--- a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java
+++ b/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java
@@ -1,5 +1,6 @@
package com.muyu.etl;
+import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.muyu.cache.AssetCache;
import com.muyu.common.core.constant.SecurityConstants;
@@ -38,29 +39,14 @@ public class AssetClientRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws ServletException {
- BasicConfigInfo basicConfigInfoSel = new BasicConfigInfo() {{
- setIsTest("1");
- }};
- Result> result = remoteAssetService.list(basicConfigInfoSel, SecurityConstants.INNER);
AssetCache assetCache = new AssetCache(redisService);
List> basicConfigInfos = assetCache.get(assetCache.preKey()+"basic");
System.out.println(basicConfigInfos.get(0));
- log.info("basicInfoList::{}",basicConfigInfos.get(0));
- log.info("初始话内容为{}", result);
-// List rows = result.getData().getRows();
if (basicConfigInfos.get(0).isEmpty()) throw new ServletException("初始化调用失败,无数据");
-// if (rows.isEmpty()) throw new ServletException("初始化调用失败,无数据");
-// log.info(rows);
- int a = 0;
for (BasicConfigInfo basicConfigInfo : basicConfigInfos.get(0)) {
- a++;
- log.info("a:{}", a);
- log.warn("接入信息:{}", basicConfigInfo);
- DruidPooledConnection init = null;
try {
log.info("初始化开始");
- init = connectionPoolFactory.init(basicConfigInfo);
- log.info("初始化结果:{}", init);
+ connectionPoolFactory.init(basicConfigInfo);
} catch (Exception e) {
log.error("初始化异常:{}", e.getMessage());
//跳过
@@ -68,7 +54,7 @@ public class AssetClientRunner implements ApplicationRunner {
}
}
log.info("初始化完成");
- log.info("连接池::{}", DruidUtilsFactory.map);
-
}
+
+
}
diff --git a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/MainApplication.java b/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/MainApplication.java
deleted file mode 100644
index c2e0a37..0000000
--- a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/MainApplication.java
+++ /dev/null
@@ -1,26 +0,0 @@
-//package com.muyu.etl;
-//
-//import com.muyu.common.security.annotation.EnableCustomConfig;
-//import com.muyu.common.security.annotation.EnableMyFeignClients;
-//import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
-//import com.sun.tools.javac.Main;
-//import org.springframework.boot.SpringApplication;
-//import org.springframework.boot.autoconfigure.SpringBootApplication;
-//import org.springframework.cloud.openfeign.EnableFeignClients;
-//
-///**
-// * @ClassName Main
-// * @Description 描述
-// * @Author SaiSai.Liu
-// * @Date 2024/5/11 16:20
-// */
-//@EnableCustomConfig
-//@EnableCustomSwagger2
-//@EnableMyFeignClients
-//@EnableFeignClients
-//@SpringBootApplication
-//public class MainApplication {
-// public static void main(String[] args) {
-// SpringApplication.run(MainApplication.class);
-// }
-//}
diff --git a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java
index d0a78d4..1633c9d 100644
--- a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java
+++ b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java
@@ -1,17 +1,24 @@
+
package com.muyu.etl.util.factory;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.util.service.ConnectionPoolFactory;
+import lombok.extern.log4j.Log4j2;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
+@Log4j2
public class DruidUtilsFactory implements ConnectionPoolFactory {
+
+ public static final ThreadLocal threadConn = new ThreadLocal<>();
+ //测试用
+ public static final ThreadLocal threadSource = new ThreadLocal<>();
// 连接池容器
- public static HashMap map = new HashMap<>();
+ public static HashMap map = new HashMap<>();
/**
* 初始化连接池
@@ -21,7 +28,7 @@ public class DruidUtilsFactory implements ConnectionPoolFactory
* @throws SQLException
*/
@Override
- public DruidPooledConnection init(BasicConfigInfo basicConfigInfo) throws SQLException {
+ public DruidDataSource init(BasicConfigInfo basicConfigInfo) throws SQLException {
// 连接池对象
DruidDataSource source = new DruidDataSource();
// 定义下面需要的对象
@@ -45,10 +52,9 @@ public class DruidUtilsFactory implements ConnectionPoolFactory
source.setTimeBetweenEvictionRunsMillis(30000);
//进程最大等待时间
source.setMaxWait(basicConfigInfo.getMaxWaitTime());
- DruidPooledConnection pool = source.getConnection();
- map.put(host + ":" + port + "/" + databaseName, pool);
+ map.put(host + ":" + port + "/" + databaseName, source);
// 获取并返回连接池对象
- return pool;
+ return source;
}
/**
@@ -58,28 +64,34 @@ public class DruidUtilsFactory implements ConnectionPoolFactory
* @return
*/
@Override
- public Connection getConnection(BasicConfigInfo basicConfigInfo) {
+ public Connection getConnection(BasicConfigInfo basicConfigInfo) {
String host = basicConfigInfo.getHost();
String port = basicConfigInfo.getPort();
String databaseName = basicConfigInfo.getDatabaseName();
- DruidPooledConnection druidPooledConnection = map.get(host + ":" + port + "/" + databaseName);
- Connection connection = druidPooledConnection.getConnection();
- return connection;
+ DruidDataSource druidDataSource = map.get(host + ":" + port + "/" + databaseName);
+ threadSource.set(druidDataSource);
+ try {
+ return druidDataSource.getConnection();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
/**
* 将连接对象放回连接池
- *
- * @param basicConfigInfo
- * @param connection
+ * @throws SQLException
*/
@Override
- public void giveBack(BasicConfigInfo basicConfigInfo, Connection connection) throws SQLException {
- String host = basicConfigInfo.getHost();
- String port = basicConfigInfo.getPort();
- String databaseName = basicConfigInfo.getDatabaseName();
- DruidPooledConnection druidPooledConnection = map.get(host + ":" + port + "/" + databaseName);
- druidPooledConnection.close();
+ public void giveBack() throws SQLException {
+ DruidDataSource source = new DruidDataSource();
+ //当前活跃连接数
+ int activeCount = source.getActiveCount();
+ log.info("当前活跃连接数:{}",activeCount);
+ Connection connection = threadConn.get();
+ connection.close();
+ log.info("连接已归还");
+ threadConn.remove();
+
}
diff --git a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java
index 52b01d3..201c7ed 100644
--- a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java
+++ b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java
@@ -1,5 +1,6 @@
package com.muyu.etl.util.service;
+import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import java.sql.Connection;
@@ -18,10 +19,10 @@ public interface ConnectionPoolFactory {
* @return
* @throws SQLException
*/
- public DruidPooledConnection init(T t) throws SQLException;
+ public DruidDataSource init(T t) throws SQLException;
- public Connection getConnection(T t);
+ public Connection getConnection(T t) throws SQLException;
- public void giveBack(T t, Connection connection) throws SQLException;
+ public void giveBack() throws SQLException;
}
diff --git a/muyu-modules/muyu-etl/muyu-etl-service/pom.xml b/muyu-modules/muyu-etl/muyu-etl-service/pom.xml
index 90ce67b..0b7ca16 100644
--- a/muyu-modules/muyu-etl/muyu-etl-service/pom.xml
+++ b/muyu-modules/muyu-etl/muyu-etl-service/pom.xml
@@ -113,11 +113,10 @@
com.muyu
muyu-etl-cache
+
com.muyu
muyu-common-etl-scope
- 3.6.3
- compile
diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java
index b3a2279..d1dba41 100644
--- a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java
+++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java
@@ -7,9 +7,12 @@ import com.muyu.etl.service.BasicConfigInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import javax.servlet.ServletException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* @ClassName EtlApplicationRunner
@@ -26,13 +29,22 @@ public class EtlApplicationRunner implements ApplicationRunner {
private RedisService redisService;
+ AssetCache assetCache = new AssetCache(redisService);
@Override
public void run(ApplicationArguments args) throws Exception {
List basicConfigInfos = basicConfigInfoService.selectBasicConfigInfoList(new BasicConfigInfo() {{
setIsTest("1");
}});
-// AssetCache assetCache = new AssetCache();
- AssetCache assetCache = new AssetCache(redisService);
assetCache.put("basic", basicConfigInfos);
}
+
+ @Scheduled(zone = "0 0/10 * * * ? *")
+ public void delay(){
+ if (!redisService.hasKey(assetCache.preKey()+"basic")) try {
+ throw new ServletException("过期了");
+ } catch (ServletException e) {
+ throw new RuntimeException(e);
+ }
+ redisService.expire(assetCache.preKey()+"basic",10, TimeUnit.MINUTES);
+ }
}
diff --git a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java
index c17325f..196585a 100644
--- a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java
+++ b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java
@@ -6,12 +6,14 @@ import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
+import org.springframework.scheduling.annotation.EnableScheduling;
@EnableCustomConfig
@EnableCustomSwagger2
@EnableMyFeignClients
@EnableFeignClients
@SpringBootApplication
+@EnableScheduling
public class UnitApplication {
public static void main(String[] args) {
SpringApplication.run(UnitApplication.class);
diff --git a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java
index e511365..1b38d16 100644
--- a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java
+++ b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java
@@ -1,5 +1,6 @@
package com.muyu.custom;
+import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson2.JSON;
import com.muyu.etl.RemoteAssetService;
@@ -41,7 +42,7 @@ public class AddInitConn {
BasicConfigInfo basicConfigInfo = JSON.parseObject(basicInfo, BasicConfigInfo.class);
log.info("队列信息{}",basicInfo);
try {
- DruidPooledConnection init = connectionPoolFactory.init(basicConfigInfo);
+ DruidDataSource init = connectionPoolFactory.init(basicConfigInfo);
if (init == null){
// 消息拒绝,放回队列中重新消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
diff --git a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java
index 2b1e86f..4503da7 100644
--- a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java
+++ b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java
@@ -1,18 +1,18 @@
package com.muyu.service.impl;
+import com.alibaba.druid.pool.DruidDataSource;
import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.domain.TableInfo;
import com.muyu.etl.util.factory.DruidUtilsFactory;
import com.muyu.etl.util.service.ConnectionPoolFactory;
import com.muyu.service.UnitService;
+import com.mysql.cj.xdevapi.Table;
+import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.servlet.ServletException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -23,6 +23,7 @@ import java.util.List;
* @Author SaiSai.Liu
* @Date 2024/5/15 22:10
*/
+@Log4j2
@Service
public class UnitServiceImpl implements UnitService {
@@ -31,7 +32,9 @@ public class UnitServiceImpl implements UnitService {
private HashMap basicConfigInfoHashMap = new HashMap<>();
@Override
public List getTableInfoList(BasicConfigInfo basicConfigInfo) {
+ List tableInfos = new ArrayList<>();
Connection connection = druidUtilsFactory.getConnection(basicConfigInfo);
+ druidUtilsFactory.threadConn.set(connection);
if (connection == null) {
try {
throw new ServletException("获取连接失败");
@@ -41,16 +44,37 @@ public class UnitServiceImpl implements UnitService {
}
try {
DatabaseMetaData metaData = connection.getMetaData();
- ResultSet tables = metaData.getTables(basicConfigInfo.getDatabaseName(), null,
+ ResultSet rs = metaData.getTables(basicConfigInfo.getDatabaseName(), null,
"%", new String[]{"TABLE", "VIEW"});
- ArrayList tableInfos = new ArrayList<>();
- while (tables.next()){
-
+ while (rs.next()){
+ DruidDataSource source = druidUtilsFactory.threadSource.get();
+ int activeCount = source.getActiveCount();
+ log.info("当前连接数:{}",activeCount);
+ String tableName = rs.getString("TABLE_NAME");
+ String remark = rs.getString("REMARKS");
+ PreparedStatement ps = connection.prepareStatement("Select COUNT(*) FROM " + tableName);
+ ResultSet rset = ps.executeQuery();
+ Long rowCount = 0L;
+ while (rset.next()){
+ rowCount = rset.getLong(1);
+ }
+ TableInfo tableInfo = TableInfo.builder()
+ .tableRemark(remark)
+ .tableName(tableName)
+ .basicId(basicConfigInfo.getId())
+ .dataNum(rowCount)
+ .build();
+ tableInfos.add(tableInfo);
}
-
} catch (SQLException e) {
throw new RuntimeException(e);
+ }finally {
+ try {
+ druidUtilsFactory.giveBack();
+ } catch (SQLException e) {
+ throw new RuntimeException("连接池归还失败");
+ }
}
- return null;
+ return tableInfos;
}
}
diff --git a/pom.xml b/pom.xml
index f63ec33..9341a95 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,12 @@
${muyu.version}
+
+ com.muyu
+ muyu-common-etl-scope
+ ${muyu.version}
+
+