修改添加,第一种方法

master
lwj 2024-09-08 23:07:01 +08:00
parent 6ab729feb8
commit b671ffbd69
1 changed files with 265 additions and 299 deletions

View File

@ -1,13 +1,12 @@
package com.muyu.cloud.etl.service.impl; package com.muyu.cloud.etl.service.impl;
import com.muyu.Hikari.HikariPool;
import com.muyu.cloud.etl.service.ProductService; import com.muyu.cloud.etl.service.ProductService;
import com.muyu.cloud.etl.service.SourceService; import com.muyu.cloud.etl.service.SourceService;
import com.muyu.cloud.etl.service.TableInfoService; import com.muyu.cloud.etl.service.TableInfoService;
import com.muyu.domain.DataValue; import com.muyu.domain.DataValue;
import com.muyu.domain.Source; import com.muyu.domain.Source;
import com.muyu.domain.TableInfo; import com.muyu.domain.TableInfo;
import com.muyu.domain.enums.DataType;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -16,9 +15,7 @@ import org.springframework.stereotype.Service;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -33,141 +30,141 @@ public class ProductServiceImpl implements ProductService {
private TableInfoService tableInfoService; private TableInfoService tableInfoService;
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
//// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); // TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
//// Long basicId1 = tableInfoDataSources.getBasicId(); // Long basicId1 = tableInfoDataSources.getBasicId();
// Source dataSources = sourceService.getById(basicId); Source dataSources = sourceService.getById(basicId);
// TableInfo tableInfo = tableInfoService.getById(tableId); TableInfo tableInfo = tableInfoService.getById(tableId);
// String tableName = tableInfo.getTableName(); String tableName = tableInfo.getTableName();
//
//// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); // HikariDataSource hikariDataSource = getHikariDataSource(dataSources);
//
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
//
// ExecutorService executorService = Executors.newFixedThreadPool(4); ExecutorService executorService = Executors.newFixedThreadPool(4);
//
// AtomicInteger addCount = new AtomicInteger(); AtomicInteger addCount = new AtomicInteger();
//
// Connection connection = null; Connection connection = null;
//
// try { try {
// connection = hikariDataSource.getConnection(); connection = hikariDataSource.getConnection();
// // 遍历外部列表 // 遍历外部列表
// for (DataValue[] dataValueList : listList) { for (DataValue[] dataValueList : listList) {
// Connection finalConnection = connection; Connection finalConnection = connection;
// executorService.submit(() -> { executorService.submit(() -> {
// try { try {
// addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList));
// } catch (SQLException e) { } catch (SQLException e) {
// // 记录异常 // 记录异常
// e.printStackTrace(); e.printStackTrace();
// } }
// }); });
// } }
//
// executorService.shutdown(); executorService.shutdown();
// executorService.awaitTermination(1, TimeUnit.HOURS); executorService.awaitTermination(1, TimeUnit.HOURS);
// } catch (InterruptedException e) { } catch (InterruptedException e) {
// Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
// throw new RuntimeException("Thread interrupted", e); throw new RuntimeException("Thread interrupted", e);
// } catch (SQLException e) { } catch (SQLException e) {
// throw new RuntimeException(e); throw new RuntimeException(e);
// } finally { } finally {
// closeConnection(connection); closeConnection(connection);
// } }
//
// return addCount.get(); return addCount.get();
// } }
//
//
// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
//
// // 获取当前行的所有字段名 // 获取当前行的所有字段名
// StringBuilder columns = new StringBuilder("("); StringBuilder columns = new StringBuilder("(");
// StringBuilder values = new StringBuilder("VALUES ("); StringBuilder values = new StringBuilder("VALUES (");
//
// // 构建字段名和占位符 // 构建字段名和占位符
// for (DataValue dataValue : dataValueList) { for (DataValue dataValue : dataValueList) {
// String key = dataValue.getKey(); String key = dataValue.getKey();
// columns.append(key).append(", "); columns.append(key).append(", ");
// values.append("?, "); values.append("?, ");
// } }
// // 删除最后一个逗号和空格 // 删除最后一个逗号和空格
// columns.delete(columns.length() - 2, columns.length()); columns.delete(columns.length() - 2, columns.length());
// values.delete(values.length() - 2, values.length()); values.delete(values.length() - 2, values.length());
//
// // 完成 SQL 插入语句 // 完成 SQL 插入语句
// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
// log.info("添加语句{}",sql); log.info("添加语句{}",sql);
//
// int addCount = 0; int addCount = 0;
//
// try { try {
// conn.setAutoCommit(false); conn.setAutoCommit(false);
// try ( try (
// PreparedStatement ps = conn.prepareStatement(sql)) { PreparedStatement ps = conn.prepareStatement(sql)) {
// // 循环设置参数并执行插入 // 循环设置参数并执行插入
//// for (DataValue dataValue : dataValueList) { // for (DataValue dataValue : dataValueList) {
// int index = 1; int index = 1;
// for (DataValue value : dataValueList) { for (DataValue value : dataValueList) {
// Object obj = value.getValue(); Object obj = value.getValue();
// if (obj instanceof String) { if (obj instanceof String) {
// ps.setString(index++, (String) obj); ps.setString(index++, (String) obj);
// log.info("类型为String,值{}",obj); log.info("类型为String,值{}",obj);
// } else if (obj instanceof Integer) { } else if (obj instanceof Integer) {
// ps.setInt(index++, (Integer) obj); ps.setInt(index++, (Integer) obj);
// log.info("类型为Integer,值{}",obj); log.info("类型为Integer,值{}",obj);
// } else if (obj instanceof Double) { } else if (obj instanceof Double) {
// ps.setDouble(index++, (Double) obj); ps.setDouble(index++, (Double) obj);
// log.info("类型为Double,值{}",obj); log.info("类型为Double,值{}",obj);
// } else if (obj instanceof Date) { } else if (obj instanceof Date) {
// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
// log.info("类型为Date,值{}",obj); log.info("类型为Date,值{}",obj);
// } else if (obj instanceof Boolean) { } else if (obj instanceof Boolean) {
// ps.setBoolean(index++, (Boolean) obj); ps.setBoolean(index++, (Boolean) obj);
// log.info("类型为Boolean,值{}",obj); log.info("类型为Boolean,值{}",obj);
// } else if (obj instanceof Float) { } else if (obj instanceof Float) {
// ps.setFloat(index++, (Float) obj); ps.setFloat(index++, (Float) obj);
// log.info("类型为Float,值{}",obj); log.info("类型为Float,值{}",obj);
// } else if (obj instanceof Long) { } else if (obj instanceof Long) {
// ps.setLong(index++, (Long) obj); ps.setLong(index++, (Long) obj);
// log.info("类型为Long,值{}",obj); log.info("类型为Long,值{}",obj);
// } else { } else {
// ps.setObject(index++, obj); ps.setObject(index++, obj);
// log.info("类型为OOO,值{}",obj); log.info("类型为OOO,值{}",obj);
// } }
// } }
// ps.addBatch(); ps.addBatch();
//// }
//
// // 执行批量插入操作
// int[] ints = ps.executeBatch();
//
// for (int anInt : ints) {
// log.info("插入成功的数据有"+anInt);
// } // }
//
// conn.commit(); // 执行批量插入操作
// } catch (SQLException e) { int[] ints = ps.executeBatch();
// e.printStackTrace();
// } for (int anInt : ints) {
// } catch (SQLException e) { log.info("插入成功的数据有"+anInt);
// throw new RuntimeException(e); }
// }
// conn.commit();
// return addCount; } catch (SQLException e) {
// } e.printStackTrace();
// }
// } catch (SQLException e) {
// private void closeConnection(Connection conn) { throw new RuntimeException(e);
// if (conn != null) { }
// try {
// conn.close(); return addCount;
// } catch (SQLException e) { }
// e.printStackTrace();
// }
// } private void closeConnection(Connection conn) {
// } if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// 第二种 // 第二种
// @Override // @Override
@ -344,86 +341,86 @@ public class ProductServiceImpl implements ProductService {
//第三种 //第三种
//
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { // public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
//
log.info("数据库主键ID"+basicId); // log.info("数据库主键ID"+basicId);
log.info("表的主键ID"+tableId); // log.info("表的主键ID"+tableId);
Source dataSources = sourceService.getById(basicId); // Source dataSources = sourceService.getById(basicId);
TableInfo tableInfo = tableInfoService.getById(tableId); // TableInfo tableInfo = tableInfoService.getById(tableId);
String tableName = tableInfo.getTableName(); // String tableName = tableInfo.getTableName();
//
HikariConfig hikariConfig = new HikariConfig(); // HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setPoolName("HikariCP 连接池"); // hikariConfig.setPoolName("HikariCP 连接池");
hikariConfig.setDriverClassName(dataSources.getDriverName()); // hikariConfig.setDriverClassName(dataSources.getDriverName());
hikariConfig.setJdbcUrl(dataSources.getUrl()); // hikariConfig.setJdbcUrl(dataSources.getUrl());
hikariConfig.setUsername(dataSources.getUsername()); // hikariConfig.setUsername(dataSources.getUsername());
hikariConfig.setPassword(dataSources.getPassword()); // hikariConfig.setPassword(dataSources.getPassword());
hikariConfig.setMinimumIdle(2); // hikariConfig.setMinimumIdle(2);
hikariConfig.setMaximumPoolSize(10); // hikariConfig.setMaximumPoolSize(10);
//
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); // HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
//
log.info("数据源ID的basicId值: {}", basicId); // log.info("数据源ID的basicId值: {}", basicId);
log.info("表的主键值: {}", tableId); // log.info("表的主键值: {}", tableId);
for (DataValue[] dataValues : listList) { // for (DataValue[] dataValues : listList) {
for (DataValue dataValue : dataValues) { // for (DataValue dataValue : dataValues) {
log.info("里面的所有的值: {}", dataValue); // log.info("里面的所有的值: {}", dataValue);
} // }
} // }
//
ExecutorService executorService = Executors.newFixedThreadPool(8); // ExecutorService executorService = Executors.newFixedThreadPool(8);
AtomicInteger addCount = new AtomicInteger(); // AtomicInteger addCount = new AtomicInteger();
//
// 分割数据为3000个批次 // // 分割数据为3000个批次
List<DataValue[][]> batches = splitData(listList, 20); // List<DataValue[][]> batches = splitData(listList, 20);
//
try (Connection conn = hikariDataSource.getConnection()) { // try (Connection conn = hikariDataSource.getConnection()) {
conn.setAutoCommit(false); // 开启事务 // conn.setAutoCommit(false); // 开启事务
//
for (DataValue[][] batch : batches) { // for (DataValue[][] batch : batches) {
log.info("数量batch: {}", batch.length); // log.info("数量batch: {}", batch.length);
//
executorService.submit(() -> { // executorService.submit(() -> {
try (PreparedStatement stmt = conn.prepareStatement(buildBatchInsertSQL(tableName, batch))) { // try (PreparedStatement stmt = conn.prepareStatement(buildBatchInsertSQL(tableName, batch))) {
int index = 1; // int index = 1;
for (DataValue[] dataValues : batch) { // for (DataValue[] dataValues : batch) {
for (DataValue dataValue : dataValues) { // for (DataValue dataValue : dataValues) {
stmt.setObject(index++, dataValue.getValue()); // stmt.setObject(index++, dataValue.getValue());
} // }
stmt.addBatch(); // stmt.addBatch();
} // }
stmt.executeBatch(); // stmt.executeBatch();
addCount.addAndGet(batch.length); // addCount.addAndGet(batch.length);
} catch (SQLException e) { // } catch (SQLException e) {
log.error("SQLException异常发生", e); // log.error("SQLException异常发生", e);
try { // try {
conn.rollback(); // 回滚事务 // conn.rollback(); // 回滚事务
} catch (SQLException ex) { // } catch (SQLException ex) {
log.error("回滚事务失败", ex); // log.error("回滚事务失败", ex);
throw new RuntimeException(ex); // throw new RuntimeException(ex);
} // }
throw new RuntimeException(e); // throw new RuntimeException(e);
} // }
}); // });
} // }
//
executorService.shutdown(); // executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS); // executorService.awaitTermination(1, TimeUnit.HOURS);
conn.commit(); // 提交事务 // conn.commit(); // 提交事务
} catch (InterruptedException e) { // } catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Thread.currentThread().interrupt();
throw new RuntimeException(e); // throw new RuntimeException(e);
} catch (SQLException e) { // } catch (SQLException e) {
throw new RuntimeException(e); // throw new RuntimeException(e);
} finally { // } finally {
close(hikariDataSource); // 关闭数据源 // close(hikariDataSource); // 关闭数据源
} // }
//
return addCount.get(); // return addCount.get();
} // }
//
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { // private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
// StringBuilder columns = new StringBuilder("("); // StringBuilder columns = new StringBuilder("(");
// StringBuilder values = new StringBuilder("VALUES "); // StringBuilder values = new StringBuilder("VALUES ");
// //
@ -435,98 +432,67 @@ public class ProductServiceImpl implements ProductService {
// //
// // 删除最后一个逗号和空格 // // 删除最后一个逗号和空格
// columns.delete(columns.length() - 2, columns.length()); // columns.delete(columns.length() - 2, columns.length());
//
// // 构建值部分 // // 构建值部分
// for (DataValue[] dataValueList : batch) { // for (DataValue[] dataValueList : batch) {
// values.append("("); // values.append("(");
// for (DataValue dataValue : dataValueList) { // for (int i = 0; i < dataValueList.length; i++) {
// Object value = dataValue.getValue(); // if (i > 0) {
// values.append("?, "); // 使用占位符 // values.append(", ");
// }
// values.append("?"); // 使用占位符
// } // }
// values.delete(values.length() - 2, values.length());
// values.append("), "); // values.append("), ");
// } // }
// //
// values.delete(values.length() - 2, values.length()); // values.delete(values.length() - 2, values.length());
// // 完成 SQL 插入语句 // // 完成 SQL 插入语句
// String sql = "INSERT INTO " + tableName + " " + columns.toString() + " )" + values.toString() + ");"; // String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ";";
//
// log.info("执行的sql添加的sql语句: {}", sql); // log.info("执行的sql添加的sql语句: {}", sql);
// return sql; // return sql;
// }
//
// private String formatValue(DataType type, Object value) {
StringBuilder columns = new StringBuilder("("); // if (type == DataType.VARCHAR || type == DataType.TEXT) {
StringBuilder values = new StringBuilder("VALUES "); // return "'" + value.toString().replace("'", "''") + "'";
// } else if (type == DataType.BIGINT) {
// 构建字段名 // return value.toString();
for (DataValue dataValue : batch[0]) { // } else if (type == DataType.INT) {
String key = dataValue.getKey(); // return value.toString();
columns.append(key).append(", "); // } else if (type == DataType.DECIMAL) {
} // return value.toString();
// } else if (type == DataType.DATETIME) {
// 删除最后一个逗号和空格 // return "'" + new java.sql.Date(((Date) value).getTime()) + "'";
columns.delete(columns.length() - 2, columns.length()); // } else if (type == DataType.DOUBLE) {
// return value.toString();
// 构建值部分 // } else {
for (DataValue[] dataValueList : batch) { // // 其他类型的处理
values.append("("); // return "'" + value.toString() + "'";
for (int i = 0; i < dataValueList.length; i++) { // }
if (i > 0) { // }
values.append(", "); //
} // private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
values.append("?"); // 使用占位符 // List<DataValue[][]> batches = new ArrayList<>();
} // int totalSize = listList.length;
values.append("), "); // int numBatches = (int) Math.ceil((double) totalSize / batchSize);
} //
// for (int i = 0; i < numBatches; i++) {
values.delete(values.length() - 2, values.length()); // int start = i * batchSize;
// 完成 SQL 插入语句 // int end = Math.min(start + batchSize, totalSize);
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ""; // DataValue[][] batch = new DataValue[end - start][];
log.info("执行的sql添加的sql语句: {}", sql); // System.arraycopy(listList, start, batch, 0, end - start);
return sql; // batches.add(batch);
} // }
//
private String formatValue(DataType type, Object value) { // log.info("");
if (type == DataType.VARCHAR || type == DataType.TEXT) { // return batches;
return "'" + value.toString().replace("'", "''") + "'"; // }
} else if (type == DataType.BIGINT) { //
return value.toString(); // private static void close(HikariDataSource dataSource) {
} else if (type == DataType.INT) { // if (dataSource != null) {
return value.toString(); // dataSource.close();
} else if (type == DataType.DECIMAL) { // }
return value.toString(); // }
} else if (type == DataType.DATETIME) {
return "'" + new java.sql.Date(((Date) value).getTime()) + "'";
} else if (type == DataType.DOUBLE) {
return value.toString();
} else {
// 其他类型的处理
return "'" + value.toString() + "'";
}
}
private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
List<DataValue[][]> batches = new ArrayList<>();
int totalSize = listList.length;
int numBatches = (int) Math.ceil((double) totalSize / batchSize);
for (int i = 0; i < numBatches; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, totalSize);
DataValue[][] batch = new DataValue[end - start][];
System.arraycopy(listList, start, batch, 0, end - start);
batches.add(batch);
}
log.info("");
return batches;
}
private static void close(HikariDataSource dataSource) {
if (dataSource != null) {
dataSource.close();
}
}