bug:无法扫描redisService

master
Saisai Liu 2024-05-16 21:36:16 +08:00
parent d50fb93778
commit 3469875469
14 changed files with 117 additions and 87 deletions

View File

@ -16,10 +16,7 @@
<maven.compiler.target>17</maven.compiler.target> <maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>
<!-- MuYu Common Security -->
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-common-redis</artifactId> <artifactId>muyu-common-redis</artifactId>

View File

@ -2,6 +2,9 @@ package com.muyu.cache.abs;
import com.muyu.cache.redis.Cache; import com.muyu.cache.redis.Cache;
import com.muyu.common.redis.service.RedisService; import com.muyu.common.redis.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
@ -12,16 +15,20 @@ import java.util.List;
* @Date 2024/5/14 17:05 * @Date 2024/5/14 17:05
*/ */
@Component
public abstract class CacheAbs<K, V extends List<?>> implements Cache<K> { public abstract class CacheAbs<K, V extends List<?>> implements Cache<K> {
@Autowired
private RedisService redisService; private RedisService redisService;
public RedisService getRedisService() { // public void setRedisService(RedisService redisService) {
return redisService; // if (redisService==null)new RedisService();
} // this.redisService = redisService;
// }
@Autowired
public CacheAbs(RedisService redisService) { public CacheAbs(RedisService redisService) {
// if (redisService==null) redisService = new RedisService();
this.redisService = redisService; this.redisService = redisService;
} }

View File

@ -2,6 +2,7 @@ package com.muyu.common.redis.service;
import org.apache.poi.ss.formula.functions.T; import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.BoundSetOperations; import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -18,6 +19,7 @@ import java.util.concurrent.TimeUnit;
**/ **/
@SuppressWarnings(value = {"unchecked", "rawtypes"}) @SuppressWarnings(value = {"unchecked", "rawtypes"})
@Component @Component
@Lazy
public class RedisService { public class RedisService {
@Autowired @Autowired
public RedisTemplate redisTemplate; public RedisTemplate redisTemplate;

View File

@ -17,14 +17,21 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-etl-common</artifactId> <artifactId>muyu-etl-common</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-common-cache</artifactId> <artifactId>muyu-common-cache</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-redis</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,5 +1,6 @@
package com.muyu.etl; package com.muyu.etl;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.pool.DruidPooledConnection;
import com.muyu.cache.AssetCache; import com.muyu.cache.AssetCache;
import com.muyu.common.core.constant.SecurityConstants; import com.muyu.common.core.constant.SecurityConstants;
@ -38,29 +39,14 @@ public class AssetClientRunner implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) throws ServletException { public void run(ApplicationArguments args) throws ServletException {
BasicConfigInfo basicConfigInfoSel = new BasicConfigInfo() {{
setIsTest("1");
}};
Result<TableDataInfo<BasicConfigInfo>> result = remoteAssetService.list(basicConfigInfoSel, SecurityConstants.INNER);
AssetCache assetCache = new AssetCache(redisService); AssetCache assetCache = new AssetCache(redisService);
List<List<BasicConfigInfo>> basicConfigInfos = assetCache.get(assetCache.preKey()+"basic"); List<List<BasicConfigInfo>> basicConfigInfos = assetCache.get(assetCache.preKey()+"basic");
System.out.println(basicConfigInfos.get(0)); System.out.println(basicConfigInfos.get(0));
log.info("basicInfoList::{}",basicConfigInfos.get(0));
log.info("初始话内容为{}", result);
// List<BasicConfigInfo> rows = result.getData().getRows();
if (basicConfigInfos.get(0).isEmpty()) throw new ServletException("初始化调用失败,无数据"); if (basicConfigInfos.get(0).isEmpty()) throw new ServletException("初始化调用失败,无数据");
// if (rows.isEmpty()) throw new ServletException("初始化调用失败,无数据");
// log.info(rows);
int a = 0;
for (BasicConfigInfo basicConfigInfo : basicConfigInfos.get(0)) { for (BasicConfigInfo basicConfigInfo : basicConfigInfos.get(0)) {
a++;
log.info("a:{}", a);
log.warn("接入信息:{}", basicConfigInfo);
DruidPooledConnection init = null;
try { try {
log.info("初始化开始"); log.info("初始化开始");
init = connectionPoolFactory.init(basicConfigInfo); connectionPoolFactory.init(basicConfigInfo);
log.info("初始化结果:{}", init);
} catch (Exception e) { } catch (Exception e) {
log.error("初始化异常:{}", e.getMessage()); log.error("初始化异常:{}", e.getMessage());
//跳过 //跳过
@ -68,7 +54,7 @@ public class AssetClientRunner implements ApplicationRunner {
} }
} }
log.info("初始化完成"); log.info("初始化完成");
log.info("连接池::{}", DruidUtilsFactory.map);
} }
} }

View File

@ -1,26 +0,0 @@
//package com.muyu.etl;
//
//import com.muyu.common.security.annotation.EnableCustomConfig;
//import com.muyu.common.security.annotation.EnableMyFeignClients;
//import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
//import com.sun.tools.javac.Main;
//import org.springframework.boot.SpringApplication;
//import org.springframework.boot.autoconfigure.SpringBootApplication;
//import org.springframework.cloud.openfeign.EnableFeignClients;
//
///**
// * @ClassName Main
// * @Description 描述
// * @Author SaiSai.Liu
// * @Date 2024/5/11 16:20
// */
//@EnableCustomConfig
//@EnableCustomSwagger2
//@EnableMyFeignClients
//@EnableFeignClients
//@SpringBootApplication
//public class MainApplication {
// public static void main(String[] args) {
// SpringApplication.run(MainApplication.class);
// }
//}

View File

@ -1,17 +1,24 @@
package com.muyu.etl.util.factory; package com.muyu.etl.util.factory;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.pool.DruidPooledConnection;
import com.muyu.etl.domain.BasicConfigInfo; import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.util.service.ConnectionPoolFactory; import com.muyu.etl.util.service.ConnectionPoolFactory;
import lombok.extern.log4j.Log4j2;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.HashMap; import java.util.HashMap;
@Log4j2
public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo> { public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo> {
public static final ThreadLocal<Connection> threadConn = new ThreadLocal<>();
//测试用
public static final ThreadLocal<DruidDataSource> threadSource = new ThreadLocal<>();
// 连接池容器 // 连接池容器
public static HashMap<String, DruidPooledConnection> map = new HashMap<>(); public static HashMap<String, DruidDataSource> map = new HashMap<>();
/** /**
* *
@ -21,7 +28,7 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
* @throws SQLException * @throws SQLException
*/ */
@Override @Override
public DruidPooledConnection init(BasicConfigInfo basicConfigInfo) throws SQLException { public DruidDataSource init(BasicConfigInfo basicConfigInfo) throws SQLException {
// 连接池对象 // 连接池对象
DruidDataSource source = new DruidDataSource(); DruidDataSource source = new DruidDataSource();
// 定义下面需要的对象 // 定义下面需要的对象
@ -45,10 +52,9 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
source.setTimeBetweenEvictionRunsMillis(30000); source.setTimeBetweenEvictionRunsMillis(30000);
//进程最大等待时间 //进程最大等待时间
source.setMaxWait(basicConfigInfo.getMaxWaitTime()); source.setMaxWait(basicConfigInfo.getMaxWaitTime());
DruidPooledConnection pool = source.getConnection(); map.put(host + ":" + port + "/" + databaseName, source);
map.put(host + ":" + port + "/" + databaseName, pool);
// 获取并返回连接池对象 // 获取并返回连接池对象
return pool; return source;
} }
/** /**
@ -58,28 +64,34 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
* @return * @return
*/ */
@Override @Override
public Connection getConnection(BasicConfigInfo basicConfigInfo) { public Connection getConnection(BasicConfigInfo basicConfigInfo) {
String host = basicConfigInfo.getHost(); String host = basicConfigInfo.getHost();
String port = basicConfigInfo.getPort(); String port = basicConfigInfo.getPort();
String databaseName = basicConfigInfo.getDatabaseName(); String databaseName = basicConfigInfo.getDatabaseName();
DruidPooledConnection druidPooledConnection = map.get(host + ":" + port + "/" + databaseName); DruidDataSource druidDataSource = map.get(host + ":" + port + "/" + databaseName);
Connection connection = druidPooledConnection.getConnection(); threadSource.set(druidDataSource);
return connection; try {
return druidDataSource.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
} }
/** /**
* *
* * @throws SQLException
* @param basicConfigInfo
* @param connection
*/ */
@Override @Override
public void giveBack(BasicConfigInfo basicConfigInfo, Connection connection) throws SQLException { public void giveBack() throws SQLException {
String host = basicConfigInfo.getHost(); DruidDataSource source = new DruidDataSource();
String port = basicConfigInfo.getPort(); //当前活跃连接数
String databaseName = basicConfigInfo.getDatabaseName(); int activeCount = source.getActiveCount();
DruidPooledConnection druidPooledConnection = map.get(host + ":" + port + "/" + databaseName); log.info("当前活跃连接数:{}",activeCount);
druidPooledConnection.close(); Connection connection = threadConn.get();
connection.close();
log.info("连接已归还");
threadConn.remove();
} }

View File

@ -1,5 +1,6 @@
package com.muyu.etl.util.service; package com.muyu.etl.util.service;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.pool.DruidPooledConnection;
import java.sql.Connection; import java.sql.Connection;
@ -18,10 +19,10 @@ public interface ConnectionPoolFactory<T> {
* @return * @return
* @throws SQLException * @throws SQLException
*/ */
public DruidPooledConnection init(T t) throws SQLException; public DruidDataSource init(T t) throws SQLException;
public Connection getConnection(T t); public Connection getConnection(T t) throws SQLException;
public void giveBack(T t, Connection connection) throws SQLException; public void giveBack() throws SQLException;
} }

View File

@ -113,11 +113,10 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-etl-cache</artifactId> <artifactId>muyu-etl-cache</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-common-etl-scope</artifactId> <artifactId>muyu-common-etl-scope</artifactId>
<version>3.6.3</version>
<scope>compile</scope>
</dependency> </dependency>

View File

@ -7,9 +7,12 @@ import com.muyu.etl.service.BasicConfigInfoService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.servlet.ServletException;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* @ClassName EtlApplicationRunner * @ClassName EtlApplicationRunner
@ -26,13 +29,22 @@ public class EtlApplicationRunner implements ApplicationRunner {
private RedisService redisService; private RedisService redisService;
AssetCache assetCache = new AssetCache(redisService);
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
List<BasicConfigInfo> basicConfigInfos = basicConfigInfoService.selectBasicConfigInfoList(new BasicConfigInfo() {{ List<BasicConfigInfo> basicConfigInfos = basicConfigInfoService.selectBasicConfigInfoList(new BasicConfigInfo() {{
setIsTest("1"); setIsTest("1");
}}); }});
// AssetCache assetCache = new AssetCache();
AssetCache assetCache = new AssetCache(redisService);
assetCache.put("basic", basicConfigInfos); assetCache.put("basic", basicConfigInfos);
} }
@Scheduled(zone = "0 0/10 * * * ? *")
public void delay(){
if (!redisService.hasKey(assetCache.preKey()+"basic")) try {
throw new ServletException("过期了");
} catch (ServletException e) {
throw new RuntimeException(e);
}
redisService.expire(assetCache.preKey()+"basic",10, TimeUnit.MINUTES);
}
} }

View File

@ -6,12 +6,14 @@ import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableCustomConfig @EnableCustomConfig
@EnableCustomSwagger2 @EnableCustomSwagger2
@EnableMyFeignClients @EnableMyFeignClients
@EnableFeignClients @EnableFeignClients
@SpringBootApplication @SpringBootApplication
@EnableScheduling
public class UnitApplication { public class UnitApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(UnitApplication.class); SpringApplication.run(UnitApplication.class);

View File

@ -1,5 +1,6 @@
package com.muyu.custom; package com.muyu.custom;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.muyu.etl.RemoteAssetService; import com.muyu.etl.RemoteAssetService;
@ -41,7 +42,7 @@ public class AddInitConn {
BasicConfigInfo basicConfigInfo = JSON.parseObject(basicInfo, BasicConfigInfo.class); BasicConfigInfo basicConfigInfo = JSON.parseObject(basicInfo, BasicConfigInfo.class);
log.info("队列信息{}",basicInfo); log.info("队列信息{}",basicInfo);
try { try {
DruidPooledConnection init = connectionPoolFactory.init(basicConfigInfo); DruidDataSource init = connectionPoolFactory.init(basicConfigInfo);
if (init == null){ if (init == null){
// 消息拒绝,放回队列中重新消费 // 消息拒绝,放回队列中重新消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

View File

@ -1,18 +1,18 @@
package com.muyu.service.impl; package com.muyu.service.impl;
import com.alibaba.druid.pool.DruidDataSource;
import com.muyu.etl.domain.BasicConfigInfo; import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.domain.TableInfo; import com.muyu.etl.domain.TableInfo;
import com.muyu.etl.util.factory.DruidUtilsFactory; import com.muyu.etl.util.factory.DruidUtilsFactory;
import com.muyu.etl.util.service.ConnectionPoolFactory; import com.muyu.etl.util.service.ConnectionPoolFactory;
import com.muyu.service.UnitService; import com.muyu.service.UnitService;
import com.mysql.cj.xdevapi.Table;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import java.sql.Connection; import java.sql.*;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -23,6 +23,7 @@ import java.util.List;
* @Author SaiSai.Liu * @Author SaiSai.Liu
* @Date 2024/5/15 22:10 * @Date 2024/5/15 22:10
*/ */
@Log4j2
@Service @Service
public class UnitServiceImpl implements UnitService { public class UnitServiceImpl implements UnitService {
@ -31,7 +32,9 @@ public class UnitServiceImpl implements UnitService {
private HashMap<String, BasicConfigInfo> basicConfigInfoHashMap = new HashMap<>(); private HashMap<String, BasicConfigInfo> basicConfigInfoHashMap = new HashMap<>();
@Override @Override
public List<TableInfo> getTableInfoList(BasicConfigInfo basicConfigInfo) { public List<TableInfo> getTableInfoList(BasicConfigInfo basicConfigInfo) {
List<TableInfo> tableInfos = new ArrayList<>();
Connection connection = druidUtilsFactory.getConnection(basicConfigInfo); Connection connection = druidUtilsFactory.getConnection(basicConfigInfo);
druidUtilsFactory.threadConn.set(connection);
if (connection == null) { if (connection == null) {
try { try {
throw new ServletException("获取连接失败"); throw new ServletException("获取连接失败");
@ -41,16 +44,37 @@ public class UnitServiceImpl implements UnitService {
} }
try { try {
DatabaseMetaData metaData = connection.getMetaData(); DatabaseMetaData metaData = connection.getMetaData();
ResultSet tables = metaData.getTables(basicConfigInfo.getDatabaseName(), null, ResultSet rs = metaData.getTables(basicConfigInfo.getDatabaseName(), null,
"%", new String[]{"TABLE", "VIEW"}); "%", new String[]{"TABLE", "VIEW"});
ArrayList<TableInfo> tableInfos = new ArrayList<>(); while (rs.next()){
while (tables.next()){ DruidDataSource source = druidUtilsFactory.threadSource.get();
int activeCount = source.getActiveCount();
log.info("当前连接数:{}",activeCount);
String tableName = rs.getString("TABLE_NAME");
String remark = rs.getString("REMARKS");
PreparedStatement ps = connection.prepareStatement("Select COUNT(*) FROM " + tableName);
ResultSet rset = ps.executeQuery();
Long rowCount = 0L;
while (rset.next()){
rowCount = rset.getLong(1);
}
TableInfo tableInfo = TableInfo.builder()
.tableRemark(remark)
.tableName(tableName)
.basicId(basicConfigInfo.getId())
.dataNum(rowCount)
.build();
tableInfos.add(tableInfo);
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
}finally {
try {
druidUtilsFactory.giveBack();
} catch (SQLException e) {
throw new RuntimeException("连接池归还失败");
}
} }
return null; return tableInfos;
} }
} }

View File

@ -248,6 +248,12 @@
<version>${muyu.version}</version> <version>${muyu.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-etl-scope</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>