添加连接数据池和任务sql查询数量

master
lwj 2024-09-05 17:30:10 +08:00
parent cd96dd36bd
commit f850395dc6
13 changed files with 915 additions and 2305 deletions

View File

@ -13,6 +13,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
public class MuYuEtlApplication { public class MuYuEtlApplication {
public static void main (String[] args) { public static void main (String[] args) {
SpringApplication.run(MuYuEtlApplication.class, args); SpringApplication.run(MuYuEtlApplication.class, args);
// Thread t1 = new Thread();
// Thread t2 = new Thread();
//
// t1.start();
// t2.start();
} }
} }

View File

@ -31,6 +31,7 @@ public class DataValueController {
} }
//添加数据库 //添加数据库
//根据值添加的表中 //根据值添加的表中
@PostMapping("/addTable") @PostMapping("/addTable")
@ -39,6 +40,19 @@ public class DataValueController {
return Result.success(i); return Result.success(i);
} }
//根据任务提供的sql 查询数量
@PostMapping("/findCount")
public Integer findCount(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql) {
return dataValueService.findCount(basicId,sql);
}

View File

@ -12,7 +12,6 @@ import com.muyu.common.system.domain.SysUser;
import com.muyu.domain.Structure; import com.muyu.domain.Structure;
import com.muyu.domain.TableInfo; import com.muyu.domain.TableInfo;
import com.muyu.domain.rep.TableInfoResp; import com.muyu.domain.rep.TableInfoResp;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
@ -33,10 +32,8 @@ public class TableInfoController {
//添加授权和修改授权 //添加授权和修改授权
@Autowired @Autowired
private AccreditService accreditService; private AccreditService accreditService;
@Autowired
private HttpServletRequest request;
//查询所有的数据进行
@GetMapping("/findTableInfoList") @GetMapping("/findTableInfoList")
public Result findByTableName() { public Result findByTableName() {
List<TableInfo> list = tableInfoService.list(); List<TableInfo> list = tableInfoService.list();
@ -121,7 +118,6 @@ public class TableInfoController {
} }
List<TableInfoResp> tableInfoResps = tableInfoHashSet.stream() List<TableInfoResp> tableInfoResps = tableInfoHashSet.stream()
.filter(tableInfo -> .filter(tableInfo ->
tableInfo.getParentId()== 0).map(tableInfo -> { tableInfo.getParentId()== 0).map(tableInfo -> {
@ -145,5 +141,4 @@ public class TableInfoController {
} }

View File

@ -0,0 +1,17 @@
package com.muyu.cloud.etl.mysql;
public class BaseConfig {
/**
* mysql
*/
public static final String MYSQLJDBCPRO="jdbc:mysql://";
public static void driver(String driverName){
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,15 @@
package com.muyu.cloud.etl.mysql;
/**
*
*/
public interface BasePool<T> {
//初始化
public void init();
//获取连接
public T getConn();
//归还连接
public void replease(T conn);
//创建连接
public T createConn();
}

View File

@ -0,0 +1,8 @@
package com.muyu.cloud.etl.mysql;
public class MysqlConnException extends RuntimeException{
public MysqlConnException(String message) {
super(message);
}
}

View File

@ -0,0 +1,119 @@
package com.muyu.cloud.etl.mysql;
import com.muyu.cloud.etl.mysql.config.MysqlPoolConfig;
import lombok.extern.log4j.Log4j2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* mysql
*/
@Log4j2
public class MysqlPool implements BasePool<Connection> {
/**
* ()
*/
private Queue<Connection> mysqlConneQueue = null;
/**
*
*/
private Queue<Connection> activeMysqlConnQueue = null;
/**
*
*/
private AtomicInteger count=new AtomicInteger();
//mysql基础配置
public MysqlPoolConfig mysqlPoolConfig;
//进行实例化连接池 加载驱动
public MysqlPool(MysqlPoolConfig mysqlPoolConfig) {
this.mysqlPoolConfig = mysqlPoolConfig;
BaseConfig.driver(mysqlPoolConfig.getDriverName());
log.info("MySQL连接池实例化完成");
}
//进行初始化连接池
@Override
public void init() {
int maxTotal = this.mysqlPoolConfig.getMaxTotal();
int initTotal = this.mysqlPoolConfig.getInitTotal();
this.mysqlConneQueue=new LinkedBlockingQueue<Connection>(maxTotal);
this.activeMysqlConnQueue=new LinkedBlockingQueue<Connection>(maxTotal);
for (int i = 0; i < initTotal; i++) {
this.mysqlConneQueue.offer(createConn());
count.incrementAndGet();
}
log.info("MYSQL连接池初始化完成");
}
/**
* 1.
* 2.
* 3.
* 4.
* 5.
* 6.
* 7. 使
* 8.
* @return
*/
@Override
public Connection getConn() {
long startTime = System.currentTimeMillis();
//从空闲队列当中取出放入活动队列
Connection conn = this.mysqlConneQueue.poll();
if(conn!=null){
this.activeMysqlConnQueue.offer(conn);
return conn;
}
//如果当前的连接数量小于最大的连接数量的时候创建新的连接0000.
if(count.get()>this.mysqlPoolConfig.getMaxTotal()){
Connection mysqlConn = createConn();
this.activeMysqlConnQueue.offer(mysqlConn);
count.incrementAndGet();
return mysqlConn;
}
if((System.currentTimeMillis()-startTime)>this.mysqlPoolConfig.getMaxWaitTime()){
throw new MysqlConnException("连接超时");
}
return null;
}
//归还连接
@Override
public void replease(Connection conn) {
//删除活动队列当中的 连接
if(this.mysqlConneQueue.remove(conn)){
//把连接放到空闲队列当中
this.mysqlConneQueue.offer(conn);
}
}
//获取mysql连接信息
public Connection createConn() {
String url = this.mysqlPoolConfig.getUrl();
String userName = this.mysqlPoolConfig.getUserName();
String password = this.mysqlPoolConfig.getPassword();
Connection mysqlConn = null;
try {
mysqlConn = DriverManager.getConnection(url, userName, password);
} catch (SQLException e) {
throw new RuntimeException(e);
}
return mysqlConn;
}
}

View File

@ -0,0 +1,53 @@
package com.muyu.cloud.etl.mysql.config;
import com.muyu.cloud.etl.mysql.BaseConfig;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MysqlPoolConfig {
//id
public String id;
//初始化连接数量
private int initTotal;
//最大连接数
private int maxTotal;
//最长等待时间 毫秒
private long maxWaitTime;
//驱动
private String driverName;
//url ip 端口 数据库 编码
private String ip;
private int port;
private String databaseName;
private String param;
//用户名
public String userName;
//密码
private String password;
/**
*
* @return
*/
public String getUrl(){
StringBuilder urlSb = new StringBuilder(BaseConfig.MYSQLJDBCPRO);
urlSb.append(this.ip);
urlSb.append(":");
urlSb.append(this.port);
urlSb.append("/");
urlSb.append(this.databaseName);
urlSb.append("?");
urlSb.append(this.param);
return urlSb.toString();
}
}

View File

@ -11,4 +11,6 @@ public interface DataValueService {
Integer addTableDataValue(Long basicId, Long tableId, List<List<DataValue>> dataValue); Integer addTableDataValue(Long basicId, Long tableId, List<List<DataValue>> dataValue);
Integer findCount(Long basicId, String sql);
} }

View File

@ -156,10 +156,113 @@ public class DataValueServiceImpl implements DataValueService {
} }
@Override @Override
public Integer addTableDataValue(Long basicId, Long tableId, List<List<DataValue>> dataValue) { public Integer addTableDataValue(Long basicId, Long tableId, List<List<DataValue>> dataValueList) {
// 获取数据源信息
Source source = sourceService.getInfo(basicId); Source source = sourceService.getInfo(basicId);
TableInfo tableInfo = tableInfoService.getById(tableId); TableInfo tableInfo = tableInfoService.getById(tableId);
// 提取连接信息
String host = source.getHost();
String port = source.getPort();
String databaseName = source.getDatabaseName();
String databaseType = source.getDatabaseType();
// 构建JDBC URL
String url = "jdbc:" + databaseType + "://" + host + ":" + port + "/" + databaseName + "?" + source.getConnectionParams();
String user = source.getUsername();
String password = source.getPassword();
Connection conn = null;
try {
// 建立数据库连接
conn = DriverManager.getConnection(url, user, password);
// 获取表名
String tableName = tableInfo.getTableName();
// 遍历所有的数据值列表
PreparedStatement pstmt = null; // 声明预编译语句对象
for (List<DataValue> dataValues : dataValueList) {
// 初始化列名和占位符字符串
StringBuilder columns = new StringBuilder("(");
StringBuilder placeholders = new StringBuilder("VALUES (");
ArrayList<String> columnNames = new ArrayList<>();
// 遍历单个数据集中的所有键值对
for (DataValue dataValue : dataValues) {
String key = dataValue.getKey();
Object value = dataValue.getValue();
// 构建列名部分
columns.append(key).append(",");
// 构建占位符部分
placeholders.append("?,");
// 收集列名用于后续处理
columnNames.add(key);
}
// 移除最后一个逗号
columns.deleteCharAt(columns.length() - 1);
placeholders.deleteCharAt(placeholders.length() - 1);
// 关闭括号
columns.append(")");
placeholders.append(")");
// 构造完整的SQL语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + " " + placeholders.toString();
// 如果有旧的预编译语句,先关闭它
if (pstmt != null) pstmt.close();
// 准备新的预编译语句
pstmt = conn.prepareStatement(sql);
// 设置参数值
int index = 1;
for (DataValue dataValue : dataValues) {
Object value = dataValue.getValue();
if (value instanceof String) {
pstmt.setString(index, (String) value);
} else if (value instanceof Integer) {
pstmt.setInt(index, (Integer) value);
} else if (value instanceof Double) {
pstmt.setDouble(index, (Double) value);
} else if (value instanceof Date) {
pstmt.setDate(index, new java.sql.Date(((Date) value).getTime()));
} else {
pstmt.setObject(index, value);
}
index++;
}
// 将语句添加到批处理中
pstmt.addBatch();
}
// 执行所有批处理语句
if (pstmt != null) {
pstmt.executeBatch();
}
} catch (SQLException e) {
// 抛出运行时异常,将原始的 SQLException 作为原因
throw new RuntimeException(e);
} finally {
// 在finally块中确保连接关闭
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
// 处理或记录关闭连接时发生的异常
}
}
}
// 返回一个指示操作成功的值
return 1;
}
@Override
public Integer findCount(Long basicId, String sql) {
Source source = sourceService.getInfo(basicId);
String host = source.getHost(); String host = source.getHost();
String port = source.getPort(); String port = source.getPort();
String databaseName = source.getDatabaseName(); String databaseName = source.getDatabaseName();
@ -167,20 +270,22 @@ public class DataValueServiceImpl implements DataValueService {
String url = "jdbc:" + databaseType + "://" + host + ":" + port + "/" + databaseName + "?" + source.getConnectionParams(); String url = "jdbc:" + databaseType + "://" + host + ":" + port + "/" + databaseName + "?" + source.getConnectionParams();
String user = source.getUsername(); String user = source.getUsername();
String password = source.getPassword(); String password = source.getPassword();
String tableName = tableInfo.getTableName();
Connection conn=null; Connection conn=null;
Integer string=0;
try { try {
conn = DriverManager.getConnection(url, user, password); conn = DriverManager.getConnection(url, user, password);
PreparedStatement prepareStatement = conn.prepareStatement(sql);
ResultSet resultSet = prepareStatement.executeQuery();
while (resultSet.next()){
string = resultSet.getInt(1);
}
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return string;
return 0;
} }
} }

View File

@ -36,11 +36,13 @@ public class SourceServiceImpl extends ServiceImpl<SourceMapper, Source> impleme
@Autowired @Autowired
private StructureService structureService; private StructureService structureService;
@Override @Override
public List<Source> selectSourceList(SourceReq sourceReq) { public List<Source> selectSourceList(SourceReq sourceReq) {
return sourceMapper.selectSourceList(sourceReq); return sourceMapper.selectSourceList(sourceReq);
} }
@Override @Override
public Source getInfo(Long id) { public Source getInfo(Long id) {
LambdaQueryWrapper<Source> lambdaQueryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<Source> lambdaQueryWrapper = new LambdaQueryWrapper<>();
@ -271,8 +273,13 @@ public class SourceServiceImpl extends ServiceImpl<SourceMapper, Source> impleme
public void syncData(Connection conn, String databaseName, TableInfo table) throws SQLException { public void syncData(Connection conn, String databaseName, TableInfo table) throws SQLException {
ExecutorService threadPool = Executors.newCachedThreadPool(); ExecutorService threadPool = Executors.newCachedThreadPool();
long rowCount = 0;
int pageSize = 1000; // 每页1000条记录
int pageNumber = 1;
PreparedStatement ps = conn.prepareStatement( PreparedStatement ps = conn.prepareStatement(
" SELECT " + " SELECT " +
" COLUMN_NAME , " + " COLUMN_NAME , " +
@ -295,6 +302,7 @@ public class SourceServiceImpl extends ServiceImpl<SourceMapper, Source> impleme
"FROM INFORMATION_SCHEMA.COLUMNS WHERE \n" + "FROM INFORMATION_SCHEMA.COLUMNS WHERE \n" +
"TABLE_SCHEMA = '" + databaseName + "' \n" + "TABLE_SCHEMA = '" + databaseName + "' \n" +
"AND TABLE_NAME = '" + table.getTableName() + "'"); "AND TABLE_NAME = '" + table.getTableName() + "'");
// "SELECT COLUMN_NAME,DATA_TYPE,IS_NULLABLE,COLUMN_KEY,COLUMN_DEFAULT, // "SELECT COLUMN_NAME,DATA_TYPE,IS_NULLABLE,COLUMN_KEY,COLUMN_DEFAULT,
// COLUMN_COMMENT,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION, // COLUMN_COMMENT,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION,
// NUMERIC_SCALE FROM INFORMATION_SCHEMA.COLUMNS // NUMERIC_SCALE FROM INFORMATION_SCHEMA.COLUMNS

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff