diff --git a/muyu-common/muyu-common-cache/pom.xml b/muyu-common/muyu-common-cache/pom.xml index 0f84556..be45d29 100644 --- a/muyu-common/muyu-common-cache/pom.xml +++ b/muyu-common/muyu-common-cache/pom.xml @@ -16,10 +16,7 @@ 17 UTF-8 - - - com.muyu muyu-common-redis diff --git a/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java b/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java index bcab592..1e8cd12 100644 --- a/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java +++ b/muyu-common/muyu-common-cache/src/main/java/com/muyu/cache/abs/CacheAbs.java @@ -2,6 +2,9 @@ package com.muyu.cache.abs; import com.muyu.cache.redis.Cache; import com.muyu.common.redis.service.RedisService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; import java.util.List; @@ -12,16 +15,20 @@ import java.util.List; * @Date 2024/5/14 17:05 */ +@Component public abstract class CacheAbs> implements Cache { - + @Autowired private RedisService redisService; - public RedisService getRedisService() { - return redisService; - } +// public void setRedisService(RedisService redisService) { +// if (redisService==null)new RedisService(); +// this.redisService = redisService; +// } + @Autowired public CacheAbs(RedisService redisService) { +// if (redisService==null) redisService = new RedisService(); this.redisService = redisService; } diff --git a/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java b/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java index 24e66b1..b00fb91 100644 --- a/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java +++ b/muyu-common/muyu-common-redis/src/main/java/com/muyu/common/redis/service/RedisService.java @@ -2,6 +2,7 @@ package com.muyu.common.redis.service; import org.apache.poi.ss.formula.functions.T; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.BoundSetOperations; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; @@ -18,6 +19,7 @@ import java.util.concurrent.TimeUnit; **/ @SuppressWarnings(value = {"unchecked", "rawtypes"}) @Component +@Lazy public class RedisService { @Autowired public RedisTemplate redisTemplate; diff --git a/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml b/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml index aad063a..ad6e583 100644 --- a/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml +++ b/muyu-modules/muyu-etl/muyu-etl-cache/pom.xml @@ -17,14 +17,21 @@ UTF-8 + com.muyu muyu-etl-common + com.muyu muyu-common-cache + + + com.muyu + muyu-common-redis + diff --git a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java b/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java index c801650..ceeefb3 100644 --- a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java +++ b/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/AssetClientRunner.java @@ -1,5 +1,6 @@ package com.muyu.etl; +import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.muyu.cache.AssetCache; import com.muyu.common.core.constant.SecurityConstants; @@ -38,29 +39,14 @@ public class AssetClientRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws ServletException { - BasicConfigInfo basicConfigInfoSel = new BasicConfigInfo() {{ - setIsTest("1"); - }}; - Result> result = remoteAssetService.list(basicConfigInfoSel, SecurityConstants.INNER); AssetCache assetCache = new AssetCache(redisService); List> basicConfigInfos = assetCache.get(assetCache.preKey()+"basic"); System.out.println(basicConfigInfos.get(0)); - log.info("basicInfoList::{}",basicConfigInfos.get(0)); - log.info("初始话内容为{}", result); -// List rows = result.getData().getRows(); if (basicConfigInfos.get(0).isEmpty()) throw new ServletException("初始化调用失败,无数据"); -// if (rows.isEmpty()) throw new ServletException("初始化调用失败,无数据"); -// log.info(rows); - int a = 0; for (BasicConfigInfo basicConfigInfo : basicConfigInfos.get(0)) { - a++; - log.info("a:{}", a); - log.warn("接入信息:{}", basicConfigInfo); - DruidPooledConnection init = null; try { log.info("初始化开始"); - init = connectionPoolFactory.init(basicConfigInfo); - log.info("初始化结果:{}", init); + connectionPoolFactory.init(basicConfigInfo); } catch (Exception e) { log.error("初始化异常:{}", e.getMessage()); //跳过 @@ -68,7 +54,7 @@ public class AssetClientRunner implements ApplicationRunner { } } log.info("初始化完成"); - log.info("连接池::{}", DruidUtilsFactory.map); - } + + } diff --git a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/MainApplication.java b/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/MainApplication.java deleted file mode 100644 index c2e0a37..0000000 --- a/muyu-modules/muyu-etl/muyu-etl-client/src/main/java/com/muyu/etl/MainApplication.java +++ /dev/null @@ -1,26 +0,0 @@ -//package com.muyu.etl; -// -//import com.muyu.common.security.annotation.EnableCustomConfig; -//import com.muyu.common.security.annotation.EnableMyFeignClients; -//import com.muyu.common.swagger.annotation.EnableCustomSwagger2; -//import com.sun.tools.javac.Main; -//import org.springframework.boot.SpringApplication; -//import org.springframework.boot.autoconfigure.SpringBootApplication; -//import org.springframework.cloud.openfeign.EnableFeignClients; -// -///** -// * @ClassName Main -// * @Description 描述 -// * @Author SaiSai.Liu -// * @Date 2024/5/11 16:20 -// */ -//@EnableCustomConfig -//@EnableCustomSwagger2 -//@EnableMyFeignClients -//@EnableFeignClients -//@SpringBootApplication -//public class MainApplication { -// public static void main(String[] args) { -// SpringApplication.run(MainApplication.class); -// } -//} diff --git a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java index d0a78d4..1633c9d 100644 --- a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java +++ b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/factory/DruidUtilsFactory.java @@ -1,17 +1,24 @@ + package com.muyu.etl.util.factory; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.muyu.etl.domain.BasicConfigInfo; import com.muyu.etl.util.service.ConnectionPoolFactory; +import lombok.extern.log4j.Log4j2; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; +@Log4j2 public class DruidUtilsFactory implements ConnectionPoolFactory { + + public static final ThreadLocal threadConn = new ThreadLocal<>(); + //测试用 + public static final ThreadLocal threadSource = new ThreadLocal<>(); // 连接池容器 - public static HashMap map = new HashMap<>(); + public static HashMap map = new HashMap<>(); /** * 初始化连接池 @@ -21,7 +28,7 @@ public class DruidUtilsFactory implements ConnectionPoolFactory * @throws SQLException */ @Override - public DruidPooledConnection init(BasicConfigInfo basicConfigInfo) throws SQLException { + public DruidDataSource init(BasicConfigInfo basicConfigInfo) throws SQLException { // 连接池对象 DruidDataSource source = new DruidDataSource(); // 定义下面需要的对象 @@ -45,10 +52,9 @@ public class DruidUtilsFactory implements ConnectionPoolFactory source.setTimeBetweenEvictionRunsMillis(30000); //进程最大等待时间 source.setMaxWait(basicConfigInfo.getMaxWaitTime()); - DruidPooledConnection pool = source.getConnection(); - map.put(host + ":" + port + "/" + databaseName, pool); + map.put(host + ":" + port + "/" + databaseName, source); // 获取并返回连接池对象 - return pool; + return source; } /** @@ -58,28 +64,34 @@ public class DruidUtilsFactory implements ConnectionPoolFactory * @return */ @Override - public Connection getConnection(BasicConfigInfo basicConfigInfo) { + public Connection getConnection(BasicConfigInfo basicConfigInfo) { String host = basicConfigInfo.getHost(); String port = basicConfigInfo.getPort(); String databaseName = basicConfigInfo.getDatabaseName(); - DruidPooledConnection druidPooledConnection = map.get(host + ":" + port + "/" + databaseName); - Connection connection = druidPooledConnection.getConnection(); - return connection; + DruidDataSource druidDataSource = map.get(host + ":" + port + "/" + databaseName); + threadSource.set(druidDataSource); + try { + return druidDataSource.getConnection(); + } catch (SQLException e) { + throw new RuntimeException(e); + } } /** * 将连接对象放回连接池 - * - * @param basicConfigInfo - * @param connection + * @throws SQLException */ @Override - public void giveBack(BasicConfigInfo basicConfigInfo, Connection connection) throws SQLException { - String host = basicConfigInfo.getHost(); - String port = basicConfigInfo.getPort(); - String databaseName = basicConfigInfo.getDatabaseName(); - DruidPooledConnection druidPooledConnection = map.get(host + ":" + port + "/" + databaseName); - druidPooledConnection.close(); + public void giveBack() throws SQLException { + DruidDataSource source = new DruidDataSource(); + //当前活跃连接数 + int activeCount = source.getActiveCount(); + log.info("当前活跃连接数:{}",activeCount); + Connection connection = threadConn.get(); + connection.close(); + log.info("连接已归还"); + threadConn.remove(); + } diff --git a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java index 52b01d3..201c7ed 100644 --- a/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java +++ b/muyu-modules/muyu-etl/muyu-etl-common/src/main/java/com/muyu/etl/util/service/ConnectionPoolFactory.java @@ -1,5 +1,6 @@ package com.muyu.etl.util.service; +import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import java.sql.Connection; @@ -18,10 +19,10 @@ public interface ConnectionPoolFactory { * @return * @throws SQLException */ - public DruidPooledConnection init(T t) throws SQLException; + public DruidDataSource init(T t) throws SQLException; - public Connection getConnection(T t); + public Connection getConnection(T t) throws SQLException; - public void giveBack(T t, Connection connection) throws SQLException; + public void giveBack() throws SQLException; } diff --git a/muyu-modules/muyu-etl/muyu-etl-service/pom.xml b/muyu-modules/muyu-etl/muyu-etl-service/pom.xml index 90ce67b..0b7ca16 100644 --- a/muyu-modules/muyu-etl/muyu-etl-service/pom.xml +++ b/muyu-modules/muyu-etl/muyu-etl-service/pom.xml @@ -113,11 +113,10 @@ com.muyu muyu-etl-cache + com.muyu muyu-common-etl-scope - 3.6.3 - compile diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java index b3a2279..d1dba41 100644 --- a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/muyu/EtlApplicationRunner.java @@ -7,9 +7,12 @@ import com.muyu.etl.service.BasicConfigInfoService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import javax.servlet.ServletException; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @ClassName EtlApplicationRunner @@ -26,13 +29,22 @@ public class EtlApplicationRunner implements ApplicationRunner { private RedisService redisService; + AssetCache assetCache = new AssetCache(redisService); @Override public void run(ApplicationArguments args) throws Exception { List basicConfigInfos = basicConfigInfoService.selectBasicConfigInfoList(new BasicConfigInfo() {{ setIsTest("1"); }}); -// AssetCache assetCache = new AssetCache(); - AssetCache assetCache = new AssetCache(redisService); assetCache.put("basic", basicConfigInfos); } + + @Scheduled(zone = "0 0/10 * * * ? *") + public void delay(){ + if (!redisService.hasKey(assetCache.preKey()+"basic")) try { + throw new ServletException("过期了"); + } catch (ServletException e) { + throw new RuntimeException(e); + } + redisService.expire(assetCache.preKey()+"basic",10, TimeUnit.MINUTES); + } } diff --git a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java index c17325f..196585a 100644 --- a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java +++ b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/UnitApplication.java @@ -6,12 +6,14 @@ import com.muyu.common.swagger.annotation.EnableCustomSwagger2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; +import org.springframework.scheduling.annotation.EnableScheduling; @EnableCustomConfig @EnableCustomSwagger2 @EnableMyFeignClients @EnableFeignClients @SpringBootApplication +@EnableScheduling public class UnitApplication { public static void main(String[] args) { SpringApplication.run(UnitApplication.class); diff --git a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java index e511365..1b38d16 100644 --- a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java +++ b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/custom/AddInitConn.java @@ -1,5 +1,6 @@ package com.muyu.custom; +import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.fastjson2.JSON; import com.muyu.etl.RemoteAssetService; @@ -41,7 +42,7 @@ public class AddInitConn { BasicConfigInfo basicConfigInfo = JSON.parseObject(basicInfo, BasicConfigInfo.class); log.info("队列信息{}",basicInfo); try { - DruidPooledConnection init = connectionPoolFactory.init(basicConfigInfo); + DruidDataSource init = connectionPoolFactory.init(basicConfigInfo); if (init == null){ // 消息拒绝,放回队列中重新消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); diff --git a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java index 2b1e86f..4503da7 100644 --- a/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java +++ b/muyu-modules/muyu-unit/muyu-unit-service/src/main/java/com/muyu/service/impl/UnitServiceImpl.java @@ -1,18 +1,18 @@ package com.muyu.service.impl; +import com.alibaba.druid.pool.DruidDataSource; import com.muyu.etl.domain.BasicConfigInfo; import com.muyu.etl.domain.TableInfo; import com.muyu.etl.util.factory.DruidUtilsFactory; import com.muyu.etl.util.service.ConnectionPoolFactory; import com.muyu.service.UnitService; +import com.mysql.cj.xdevapi.Table; +import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.servlet.ServletException; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.SQLException; +import java.sql.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -23,6 +23,7 @@ import java.util.List; * @Author SaiSai.Liu * @Date 2024/5/15 22:10 */ +@Log4j2 @Service public class UnitServiceImpl implements UnitService { @@ -31,7 +32,9 @@ public class UnitServiceImpl implements UnitService { private HashMap basicConfigInfoHashMap = new HashMap<>(); @Override public List getTableInfoList(BasicConfigInfo basicConfigInfo) { + List tableInfos = new ArrayList<>(); Connection connection = druidUtilsFactory.getConnection(basicConfigInfo); + druidUtilsFactory.threadConn.set(connection); if (connection == null) { try { throw new ServletException("获取连接失败"); @@ -41,16 +44,37 @@ public class UnitServiceImpl implements UnitService { } try { DatabaseMetaData metaData = connection.getMetaData(); - ResultSet tables = metaData.getTables(basicConfigInfo.getDatabaseName(), null, + ResultSet rs = metaData.getTables(basicConfigInfo.getDatabaseName(), null, "%", new String[]{"TABLE", "VIEW"}); - ArrayList tableInfos = new ArrayList<>(); - while (tables.next()){ - + while (rs.next()){ + DruidDataSource source = druidUtilsFactory.threadSource.get(); + int activeCount = source.getActiveCount(); + log.info("当前连接数:{}",activeCount); + String tableName = rs.getString("TABLE_NAME"); + String remark = rs.getString("REMARKS"); + PreparedStatement ps = connection.prepareStatement("Select COUNT(*) FROM " + tableName); + ResultSet rset = ps.executeQuery(); + Long rowCount = 0L; + while (rset.next()){ + rowCount = rset.getLong(1); + } + TableInfo tableInfo = TableInfo.builder() + .tableRemark(remark) + .tableName(tableName) + .basicId(basicConfigInfo.getId()) + .dataNum(rowCount) + .build(); + tableInfos.add(tableInfo); } - } catch (SQLException e) { throw new RuntimeException(e); + }finally { + try { + druidUtilsFactory.giveBack(); + } catch (SQLException e) { + throw new RuntimeException("连接池归还失败"); + } } - return null; + return tableInfos; } } diff --git a/pom.xml b/pom.xml index f63ec33..9341a95 100644 --- a/pom.xml +++ b/pom.xml @@ -248,6 +248,12 @@ ${muyu.version} + + com.muyu + muyu-common-etl-scope + ${muyu.version} + +