text:(连接池,重构)

master
031026 2024-05-09 21:56:07 +08:00
commit f16334da34
19 changed files with 572 additions and 139 deletions

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>muyu-common</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>muyu-common-cache</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- common 缓存 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-redis</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,41 @@
//package com.muyu.common.cache;
//
///**
// * 提供了一个连接池的准则
// *
// * @author AnNan.Wang
// * @Date 2024/4/29 029 14:41
// */
//public interface BasePool<T> {
// /**
// * 初始化
// */
// public void init();
//
// /**
// * 获取连接
// */
//
// public T getConn();
//
// /**
// * 归还连接
// * @param conn
// */
//
// public void reaplase(T conn);
//
// /**
// * 创建连接
// * @return
// */
//
// public T creatConnection();
//
//
// /**
// * 关闭连接
// */
// public void closeConn();
//
//}

View File

@ -0,0 +1,13 @@
//package com.muyu.common.cache.data;
//
///**
// * Mysql超时异常
// *
// * @author WangLei
// * @Date 2024/4/29 029 15:17
// */
//public class MysqlConnException extends RuntimeException{
// public MysqlConnException(String message) {
// super(message);
// }
//}

View File

@ -0,0 +1,24 @@
//package com.muyu.common.cache.data;
//
///**
// * 基础定值
// *
// * @author WangLei
// * @Date 2024/4/29 029 14:30
// */
//
//public class MysqlPrefix {
// /**
// * mysql连接前缀
// */
//
// public static final String MYSQLDUMP = "jdbc:mysql://";
// public static final String POSTGRESQL ="jdbc:postgresql://";
// public static void driver(String driverName){
// try {
// Class.forName(driverName);
// } catch (ClassNotFoundException e) {
// throw new RuntimeException(e);
// }
// }
//}

View File

@ -18,6 +18,7 @@
<module>muyu-common-datascope</module>
<module>muyu-common-datasource</module>
<module>muyu-common-system</module>
<module>muyu-common-cache</module>
</modules>
<artifactId>muyu-common</artifactId>

View File

@ -34,5 +34,10 @@
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-cache</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,41 @@
package com.muyu.source.clinet;
/**
*
*
* @author AnNan.Wang
* @Date 2024/4/29 029 14:41
*/
public interface BasePool<T> {
/**
*
*/
public void init();
/**
*
*/
public T getConn();
/**
*
* @param conn
*/
public void reaplase(T conn);
/**
*
* @return
*/
public T creatConnection();
/**
*
*/
public void closeConn();
}

View File

@ -0,0 +1,13 @@
package com.muyu.source.clinet;
/**
* Mysql
*
* @author WangLei
* @Date 2024/4/29 029 15:17
*/
public class MysqlConnException extends RuntimeException{
public MysqlConnException(String message) {
super(message);
}
}

View File

@ -0,0 +1,24 @@
package com.muyu.source.clinet;
/**
*
*
* @author WangLei
* @Date 2024/4/29 029 14:30
*/
public class MysqlPrefix {
/**
* mysql
*/
public static final String MYSQLDUMP = "jdbc:mysql://";
public static final String POSTGRESQL ="jdbc:postgresql://";
public static void driver(String driverName){
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,24 +1,19 @@
package com.muyu.source.clinet.config;
import com.alibaba.druid.pool.DruidDataSource;
import com.muyu.source.clinet.pool.MysqlPool;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.web.page.TableDataInfo;
import com.muyu.data.source.domain.DataSource;
import com.muyu.data.source.domain.req.DataSourceQueryReq;
import com.muyu.data.source.remote.RemoteDataManagerService;
import com.muyu.source.clinet.factory.Singleton;
import com.muyu.source.clinet.pool.config.MysqlPoolConfig;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -33,43 +28,55 @@ public class DataSourceClinetRunner implements ApplicationRunner {
@Autowired
private RemoteDataManagerService remoteDataManagerService;
@Override
public void run(ApplicationArguments args) throws Exception {
Result<TableDataInfo<DataSource>> list = remoteDataManagerService.list(new DataSourceQueryReq());
TableDataInfo<DataSource> data = list.getData();
List<DataSource> rows = data.getRows();
HashMap<String,DataSource> map = new HashMap<>();
DruidDataSource druidDataSource = new DruidDataSource();
for (DataSource row : rows) {
if (row.getDataAccessTypeId()==3) {
map.put(row.getId()+row.getDatabaseName(),row);
druidDataSource.setUrl("jdbc:mysql://" + row.getHostAddress() + ":" + row.getHostPort() + "/" + row.getDatabaseName());
druidDataSource.setUsername(row.getDatabaseUserName());
druidDataSource.setPassword(row.getDatabaseUserPassword());
MysqlPool mysqlPool = new MysqlPool(MysqlPoolConfig.buildConfig(row));
mysqlPool.init();
Connection connection = mysqlPool.creatConnection();
log.info(connection);
mysqlPool.reaplase(connection);
// 配置连接池属性(可选)
druidDataSource.setInitialSize(row.getInitialQuantity()); // 初始化连接数
druidDataSource.setMaxActive(row.getMaximumQuantity()); // 最大连接数
druidDataSource.setMinIdle(row.getMaximumFrequency()); // 最小空闲连接数
druidDataSource.setMaxWait(row.getMaximumTime());
}else if (row.getDataAccessTypeId()==5){
druidDataSource.setUrl("jdbc:postgresql://" + row.getHostAddress() + ":" + row.getHostPort() + "/" + row.getDatabaseName());
druidDataSource.setUsername(row.getDatabaseUserName());
druidDataSource.setPassword(row.getDatabaseUserPassword());
// 配置连接池属性(可选)
druidDataSource.setInitialSize(row.getInitialQuantity()); // 初始化连接数
druidDataSource.setMaxActive(row.getMaximumQuantity()); // 最大连接数
druidDataSource.setMinIdle(row.getMaximumFrequency()); // 最小空闲连接数
druidDataSource.setMaxWait(row.getMaximumTime());
map.put(row.getId()+row.getDatabaseName(),row);
}
}
log.info(map);
log.info(rows.size());
}
}
//druid(德鲁伊) 连接池
// DataSourceSingleton dataSourceSingleton = DataSourceSingleton.getInstance();
// DruidDataSource druidDataSource = new DruidDataSource();
// try {
// String dataSourceKey = row.getId() + row.getDatabaseName();
//
// if (row.getDataAccessTypeId() == 3) {
// configureDruidDataSource(druidDataSource, "jdbc:mysql://" + row.getHostAddress() + ":" + row.getHostPort() + "/" + row.getDatabaseName(),
// row.getDatabaseUserName(), row.getDatabaseUserPassword(), row.getInitialQuantity(), row.getMaximumQuantity(), row.getMaximumFrequency(), row.getMaximumTime());
// } else if (row.getDataAccessTypeId() == 5) {
// configureDruidDataSource(druidDataSource, "jdbc:postgresql://" + row.getHostAddress() + ":" + row.getHostPort() + "/" + row.getDatabaseName(),
// row.getDatabaseUserName(), row.getDatabaseUserPassword(), row.getInitialQuantity(), row.getMaximumQuantity(), row.getMaximumFrequency(), row.getMaximumTime());
// }
//
//
//
// // 将数据源添加到 DataSourceSingleton 中
// } catch (Exception e) {
// log.error("Failed to configure data source for {}", row.getDatabaseName(), e);
// }
// private void configureDruidDataSource(DruidDataSource dataSource, String url, String username, String password,
// int initialSize, int maxActive, int minIdle, int maxWait) {
// dataSource.setUrl(url);
// dataSource.setUsername(username);
// dataSource.setPassword(password);
// dataSource.setInitialSize(initialSize);
// dataSource.setMaxActive(maxActive);
// dataSource.setMinIdle(minIdle);
// dataSource.setMaxWait(maxWait);
// }
}

View File

@ -0,0 +1,37 @@
package com.muyu.source.clinet.factory;
import com.muyu.data.source.domain.DataSource;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public class DataSourceSingleton implements Serializable {
public static Map<String, DataSource> dataSourceMap = new HashMap<>();
public static volatile DataSourceSingleton instance;
// 使用双重检查锁确保线程安全
public static DataSourceSingleton getInstance() {
if (instance == null) {
synchronized (DataSourceSingleton.class) {
if (instance == null) {
instance = new DataSourceSingleton();
}
}
}
return instance;
}
// 添加数据源
public static void addDataSource(String key, DataSource dataSource) {
dataSourceMap.put(key, dataSource);
}
// 获取数据源
public static DataSource getDataSource(String key) {
return dataSourceMap.get(key);
}
}

View File

@ -1,24 +0,0 @@
package com.muyu.source.clinet.factory;
import com.alibaba.druid.pool.DruidDataSource;
import com.muyu.data.source.domain.DataSource;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.HashMap;
@Data
@AllArgsConstructor
public class Singleton {
private HashMap<String, DataSource> map ;
private static Singleton singleton;
//加入了同步代码,解决线程不安全问题
public static synchronized Singleton getInstance(HashMap<String, DataSource> map) {
if (singleton == null) {
singleton = new Singleton(map);
}
return singleton;
}
}

View File

@ -0,0 +1,196 @@
package com.muyu.source.clinet.pool;
import com.muyu.source.clinet.BasePool;
import com.muyu.source.clinet.MysqlPrefix;
import com.muyu.source.clinet.pool.config.MysqlPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Mysql
*
* @author AnNan.Wang
* @Date 2024/4/29 029 14:36
*/
@Component
public class MysqlPool implements BasePool<Connection> {
/**
*
*/
private static final Logger log = LoggerFactory.getLogger(MysqlPool.class);
/**
*
*/
private Queue<Connection> mysqlConnQueue = null;
/**
*
*/
private Queue<Connection> activeMysqlQueue = null;
/**
*
*/
private AtomicInteger count = new AtomicInteger();
/**
*
*/
public MysqlPoolConfig mysqlPoolConfig;
/**
*
* @param mysqlPoolConfig
*/
public MysqlPool(MysqlPoolConfig mysqlPoolConfig) {
log.info("Mysql连接池实例化完成");
this.mysqlPoolConfig = mysqlPoolConfig;
MysqlPrefix.driver(this.mysqlPoolConfig.getDriverName());
}
@Override
public void init() {
//初始话连接数
int initTotal = this.mysqlPoolConfig.getInitTotal();
//最大连接数
int maxTotal = this.mysqlPoolConfig.getMaxTotal();
this.mysqlConnQueue = new LinkedBlockingQueue<>();
this.activeMysqlQueue = new LinkedBlockingQueue<>();
for (int i = 0; i < initTotal; i++) {
this.mysqlConnQueue.offer(creatConnection());
count.incrementAndGet();
}
log.info("数据库连接池初始化完成");
}
@Override
public Connection getConn() {
long startTime = System.currentTimeMillis();
Connection conn = this.mysqlConnQueue.poll();
if (conn == null) {
this.activeMysqlQueue.offer(conn);
return conn;
}
if (count.get()<this.mysqlPoolConfig.getMaxTotal()) {
Connection connection = creatConnection();
this.activeMysqlQueue.offer(connection);
count.incrementAndGet();
return connection;
}
if (System.currentTimeMillis() - startTime > this.mysqlPoolConfig.getMaxWaitTimes()) {
throw new IllegalStateException("连接超时");
}
return null;
}
/**
*
* @param conn
*/
@Override
public void reaplase(Connection conn) {
log.info("删除活动队列当中的连接");
if (this.activeMysqlQueue.remove(conn)) {
log.info("把这个队列放到空闲队列当中");
this.mysqlConnQueue.offer(conn);
}
}
/**
*
* @return
*/
@Override
public Connection creatConnection() {
String url = null;
if (this.mysqlPoolConfig.getDataAccessTypeId() == 3) {
url = this.mysqlPoolConfig.getUrl();
}else if (this.mysqlPoolConfig.getDataAccessTypeId() == 5){
url = this.mysqlPoolConfig.getPostgreSqlUrl();
}
String userName = this.mysqlPoolConfig.getUserName();
String password = this.mysqlPoolConfig.getPassword();
Connection connection = null;
try {
connection = DriverManager.getConnection(url, userName, password);
} catch (SQLException e) {
throw new RuntimeException(e);
}
log.info("初始化了一个数据库连接:{ip: " + this.mysqlPoolConfig.getIp() + " port: " + this.mysqlPoolConfig.getPort() + " databaseName: " + this.mysqlPoolConfig.getDatabaseName() + "}");
return connection;
}
/**
*
*/
@Override
public void closeConn() {
closeBaseConn();
closeActiveConn();
}
private void closeActiveConn() {
Connection poll = this.mysqlConnQueue.poll();
if (poll != null) {
try {
poll.close();
} catch (SQLException e) {
e.printStackTrace();
try {
if(!poll.isClosed()) {
this.mysqlConnQueue.offer(poll);
}
} catch (Exception ex) {
ex.printStackTrace();
}
} finally {
closeBaseConn();
}
}
}
private void closeBaseConn() {
Connection poll = this.mysqlConnQueue.poll();
if (poll != null) {
try {
poll.close();
} catch (SQLException e) {
e.printStackTrace();
try {
if(!poll.isClosed()) {
this.mysqlConnQueue.offer(poll);
}
} catch (Exception ex) {
ex.printStackTrace();
}
} finally {
closeBaseConn();
}
}
}
}

View File

@ -0,0 +1,97 @@
package com.muyu.source.clinet.pool.config;
import com.muyu.data.source.domain.DataSource;
import com.muyu.source.clinet.MysqlPrefix;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class MysqlPoolConfig extends MysqlPrefix {
/**
* id
*/
private String id;
/**
*
*/
private Long dataAccessTypeId;
/**
*
*/
private Integer initTotal;
/**
*
*/
private Integer maxTotal;
/**
*
*/
private Integer maxWaitTimes;
/**
*
*/
private String driverName;
/**
* url ip
*/
private String ip;
private String port;
private String databaseName;
private String param;
/**
*
*/
private String userName;
/**
*
*/
private String password;
public static MysqlPoolConfig buildConfig(DataSource sysDataSource) {
return MysqlPoolConfig.builder()
.id(String.valueOf(sysDataSource.getId()))
.dataAccessTypeId(sysDataSource.getDataAccessTypeId())
.initTotal(sysDataSource.getInitialQuantity())
.maxTotal(sysDataSource.getMaximumQuantity())
.maxWaitTimes(sysDataSource.getMaximumTime())
.driverName("com.mysql.cj.jdbc.Driver")
.ip(sysDataSource.getHostAddress())
.port(sysDataSource.getHostPort())
.databaseName(sysDataSource.getDatabaseName())
.param(sysDataSource.getDataConnectionParameter())
.userName(sysDataSource.getDatabaseUserName())
.password(sysDataSource.getDatabaseUserPassword())
.build();
}
/**
*
*/
public String getUrl() {
StringBuilder urlSb = new StringBuilder(MysqlPrefix.MYSQLDUMP);
urlSb.append(this.ip);
urlSb.append(":");
urlSb.append(this.port);
urlSb.append("/");
urlSb.append(this.databaseName);
urlSb.append("?");
urlSb.append(this.param);
return urlSb.toString();
};
public String getPostgreSqlUrl() {
StringBuilder urlSb = new StringBuilder(MysqlPrefix.POSTGRESQL);
urlSb.append(this.ip);
urlSb.append(":");
urlSb.append(this.port);
urlSb.append("/");
urlSb.append(this.databaseName);
return urlSb.toString();
};
}

View File

@ -1 +1,2 @@
com.muyu.source.clinet.config.DataSourceClinetConfig
com.muyu.source.clinet.config.DataSourceClinetRunner

View File

@ -3,6 +3,7 @@ package com.muyu.data.unlt;
import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients;
import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@ -21,6 +22,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableMyFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableAsync
@Log4j2
public class DataUnltApplication {
public static void main (String[] args) {
SpringApplication.run(DataUnltApplication.class, args);

View File

@ -1,78 +0,0 @@
package com.muyu.data.unlt.test;
import com.alibaba.druid.pool.DruidDataSource;
import com.muyu.data.source.domain.DataSource;
import com.muyu.source.clinet.factory.Singleton;
import lombok.extern.log4j.Log4j2;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
/**
*
*
* @ClassName Test
* @Author AnNan.Wang
* @Date 2024/5/9 21:37
*/
@Log4j2
public class Test {
public static void main(String[] args) {
Integer id=10;
String name = "source";
HashMap<String, DataSource> map = new HashMap<String,DataSource>();
Singleton instance = Singleton.getInstance(map);
DataSource druidDataSource = map.get(id + name);
log.info(instance);
// // 创建 Druid 数据源对象
// DruidDataSource dataSource = new DruidDataSource();
// dataSource.setUrl("jdbc:mysql://localhost:3306/demo");
// dataSource.setUsername("root");
// dataSource.setPassword("root");
//
// Connection connection = null;
// Statement statement = null;
// ResultSet resultSet = null;
//
// try {
// // 从数据源中获取数据库连接
// connection = dataSource.getConnection();
//
// // 创建 Statement 对象
// statement = connection.createStatement();
//
// // 执行查询语句
// resultSet = statement.executeQuery("SELECT * FROM t_user");
//
// // 处理结果集
// while (resultSet.next()) {
// // 通过列名获取数据
// String columnName = resultSet.getString("user_id");
//
// // 处理数据
// System.out.println(columnName);
// }
// } catch (SQLException e) {
// e.printStackTrace();
// } finally {
// // 关闭资源
// try {
// if (resultSet != null) {
// resultSet.close();
// }
// if (statement != null) {
// statement.close();
// }
// if (connection != null) {
// connection.close();
// }
// } catch (SQLException e) {
// e.printStackTrace();
// }
// }
}
}

View File

@ -212,6 +212,12 @@
<version>${muyu.version}</version>
</dependency>
<!-- 缓存基准模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-cache</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies>
</dependencyManagement>