diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java index f09496e..de59beb 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java @@ -14,8 +14,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -169,177 +169,331 @@ public class ProductServiceImpl implements ProductService { // } // } - - @Override - public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { - - Source dataSources = sourceService.getById(basicId); - TableInfo tableInfo = tableInfoService.getById(tableId); - String tableName = tableInfo.getTableName(); - - HikariConfig hikariConfig = new HikariConfig(); - hikariConfig.setPoolName("HikariCP 连接池"); - hikariConfig.setDriverClassName(dataSources.getDriverName()); - hikariConfig.setJdbcUrl(dataSources.getUrl()); - hikariConfig.setUsername(dataSources.getUsername()); - hikariConfig.setPassword(dataSources.getPassword()); - hikariConfig.setMinimumIdle(2); - hikariConfig.setMaximumPoolSize(10); - - HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); + // 第二种 +// @Override +// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +// +// Source dataSources = sourceService.getById(basicId); +// TableInfo tableInfo = tableInfoService.getById(tableId); +// String tableName = tableInfo.getTableName(); +// +// HikariConfig hikariConfig = new HikariConfig(); +// hikariConfig.setPoolName("HikariCP 连接池"); +// hikariConfig.setDriverClassName(dataSources.getDriverName()); +// hikariConfig.setJdbcUrl(dataSources.getUrl()); +// hikariConfig.setUsername(dataSources.getUsername()); +// hikariConfig.setPassword(dataSources.getPassword()); +// hikariConfig.setMinimumIdle(2); +// hikariConfig.setMaximumPoolSize(10); +// +// HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); +// +// +// log.info("数据源ID的basicId{}值"+basicId); +// log.info("表的主键{}值"+tableId); +// for (DataValue[] dataValues : listList) { +// for (DataValue dataValue : dataValues) { +// log.info("里面的所有的值"+dataValue); +// } +// } +// +//// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); +// +// ExecutorService executorService = Executors.newFixedThreadPool(8); +// +// AtomicInteger addCount = new AtomicInteger(); +// +// // 分割数据为3000个批次 +// List batches = splitData(listList, 20); +// +// try (Connection conn = hikariDataSource.getConnection()) { +// conn.setAutoCommit(false); // 开启事务 +// +// for (DataValue[][] batch : batches) { +// log.info("数量batch{}", batch.length); +// +// executorService.submit(() -> { +// try (Statement stmt = conn.createStatement()) { +// +// String sql = buildBatchInsertSQL(tableName, batch); +// +// stmt.executeUpdate(sql); +// +// addCount.addAndGet(batch.length); +// } catch (SQLException e) { +// log.error("SQLException异常发生", e); +// try { +// conn.rollback(); // 回滚事务 +// } catch (SQLException ex) { +// log.error("回滚事务失败", ex); +// throw new RuntimeException(ex); +// } +// throw new RuntimeException(e); +// } +// }); +// } +// +// executorService.shutdown(); +// executorService.awaitTermination(1, TimeUnit.HOURS); +// conn.commit(); // 提交事务 +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// throw new RuntimeException(e); +// } catch (SQLException e) { +// throw new RuntimeException(e); +// } finally { +// close(hikariDataSource); // 关闭数据源 +// } +// +// return addCount.get(); +// } +// +// private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { +// +// +// for (DataValue[] dataValues : batch) { +// for (DataValue dataValue : dataValues) { +// log.info("我的键{},值{}", dataValues, dataValue); +// } +// } +// +// StringBuilder columns = new StringBuilder("("); +// StringBuilder values = new StringBuilder("VALUES "); +// // 构建字段名 +// for (DataValue dataValue : batch[0]) { +// String key = dataValue.getKey(); +// columns.append(key).append(", "); +// } +// +// // 删除最后一个逗号和空格 +// columns.delete(columns.length() - 2, columns.length()); +// log.info("陈思豪是傻逼"+columns); +// int count=0; +// // 构建值部分 +// for (DataValue[] dataValueList : batch) { +// values.append("("); +// for (DataValue dataValue : dataValueList) { +// Object value = dataValue.getValue(); +// values.append(formatValue(dataValue.getType(), value)).append(", "); +// log.info("值{{{{aaaaaa}}}}}}}}}"+count+"+个数+"+value); +// } +// values.delete(values.length() - 2, values.length()); +// values.append("), "); +// } +// +// +// values.delete(values.length() - 2, values.length()); +// log.info("值++++++++++++++++++++++++++++++++++++++++++++++++++++++++"+values.toString()); +// // 完成 SQL 插入语句 +// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString(); +// log.info("执行的sql添加的sql语句"+sql); +// return sql; +// } +// +// private String formatValue(DataType type, Object value) { +// +// if (type == DataType.VARCHAR || type == DataType.TEXT) { +// +// return "'" + value.toString().replace("'", "''") + "'"; +// } else if (type == DataType.BIGINT) { +// +// return value.toString(); +// } else if (type == DataType.INT) { +// +// return value.toString(); +// } else if (type == DataType.DECIMAL) { +// +// return value.toString(); +// } else if (type == DataType.DATETIME) { +// +// return "'" + new java.sql.Date(((Date) value).getTime()) + "'"; +// } else if (type == DataType.DOUBLE) { +// +// return value.toString(); +// } else { +// // 其他类型的处理 +// +// return "'" + value.toString() + "'"; +// } +// +// } +// +// private List splitData(DataValue[][] listList, int batchSize) { +// List batches = new ArrayList<>(); +// int totalSize = listList.length; +// int numBatches = (int) Math.ceil((double) totalSize / batchSize); +// +// for (int i = 0; i < numBatches; i++) { +// int start = i * batchSize; +// int end = Math.min(start + batchSize, totalSize); +// DataValue[][] batch = new DataValue[end - start][]; +// System.arraycopy(listList, start, batch, 0, end - start); +// batches.add(batch); +// } +// +// log.info(""); +// return batches; +// } +// +// private static void close(HikariDataSource dataSource) { +// if (dataSource != null) { +// dataSource.close(); +// } +// } - log.info("数据源ID的basicId{}值"+basicId); - log.info("表的主键{}值"+tableId); - for (DataValue[] dataValues : listList) { - for (DataValue dataValue : dataValues) { - log.info("里面的所有的值"+dataValue); + //第三种 + + + public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { + Source dataSources = sourceService.getById(basicId); + TableInfo tableInfo = tableInfoService.getById(tableId); + String tableName = tableInfo.getTableName(); + + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setPoolName("HikariCP 连接池"); + hikariConfig.setDriverClassName(dataSources.getDriverName()); + hikariConfig.setJdbcUrl(dataSources.getUrl()); + hikariConfig.setUsername(dataSources.getUsername()); + hikariConfig.setPassword(dataSources.getPassword()); + hikariConfig.setMinimumIdle(2); + hikariConfig.setMaximumPoolSize(10); + + HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); + + log.info("数据源ID的basicId值: {}", basicId); + log.info("表的主键值: {}", tableId); + for (DataValue[] dataValues : listList) { + for (DataValue dataValue : dataValues) { + log.info("里面的所有的值: {}", dataValue); + } } - } -// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); + ExecutorService executorService = Executors.newFixedThreadPool(8); + AtomicInteger addCount = new AtomicInteger(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + // 分割数据为3000个批次 + List batches = splitData(listList, 20); - AtomicInteger addCount = new AtomicInteger(); + try (Connection conn = hikariDataSource.getConnection()) { + conn.setAutoCommit(false); // 开启事务 - // 分割数据为3000个批次 - List batches = splitData(listList, 20); + for (DataValue[][] batch : batches) { + log.info("数量batch: {}", batch.length); - try (Connection conn = hikariDataSource.getConnection()) { - conn.setAutoCommit(false); // 开启事务 - - for (DataValue[][] batch : batches) { - log.info("数量batch{}", batch.length); - - executorService.submit(() -> { - try (Statement stmt = conn.createStatement()) { - - String sql = buildBatchInsertSQL(tableName, batch); - - stmt.executeUpdate(sql); - - addCount.addAndGet(batch.length); - } catch (SQLException e) { - log.error("SQLException异常发生", e); - try { - conn.rollback(); // 回滚事务 - } catch (SQLException ex) { - log.error("回滚事务失败", ex); - throw new RuntimeException(ex); + executorService.submit(() -> { + try (PreparedStatement stmt = conn.prepareStatement(buildBatchInsertSQL(tableName, batch))) { + int index = 1; + for (DataValue[] dataValues : batch) { + for (DataValue dataValue : dataValues) { + stmt.setObject(index++, dataValue.getValue()); + } + stmt.addBatch(); + } + stmt.executeBatch(); + addCount.addAndGet(batch.length); + } catch (SQLException e) { + log.error("SQLException异常发生", e); + try { + conn.rollback(); // 回滚事务 + } catch (SQLException ex) { + log.error("回滚事务失败", ex); + throw new RuntimeException(ex); + } + throw new RuntimeException(e); } - throw new RuntimeException(e); - } - }); + }); + } + + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.HOURS); + conn.commit(); // 提交事务 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + close(hikariDataSource); // 关闭数据源 } - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.HOURS); - conn.commit(); // 提交事务 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - close(hikariDataSource); // 关闭数据源 + return addCount.get(); } - return addCount.get(); - } + private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { + StringBuilder columns = new StringBuilder("("); + StringBuilder values = new StringBuilder("VALUES "); - private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { - - - for (DataValue[] dataValues : batch) { - for (DataValue dataValue : dataValues) { - log.info("我的键{},值{}", dataValues, dataValue); + // 构建字段名 + for (DataValue dataValue : batch[0]) { + String key = dataValue.getKey(); + columns.append(key).append(", "); } - } - StringBuilder columns = new StringBuilder("("); - StringBuilder values = new StringBuilder("VALUES "); - // 构建字段名 - for (DataValue dataValue : batch[0]) { - String key = dataValue.getKey(); - columns.append(key).append(", "); - } + // 删除最后一个逗号和空格 + columns.delete(columns.length() - 2, columns.length()); - // 删除最后一个逗号和空格 - columns.delete(columns.length() - 2, columns.length()); - log.info("陈思豪是傻逼"+columns); - int count=0; - // 构建值部分 - for (DataValue[] dataValueList : batch) { - values.append("("); - for (DataValue dataValue : dataValueList) { - Object value = dataValue.getValue(); - values.append(formatValue(dataValue.getType(), value)).append(", "); - log.info("值{{{{aaaaaa}}}}}}}}}"+count+"+个数+"+value); + // 构建值部分 + for (DataValue[] dataValueList : batch) { + values.append("("); + for (DataValue dataValue : dataValueList) { + Object value = dataValue.getValue(); + values.append("?, "); // 使用占位符 + } + values.delete(values.length() - 2, values.length()); + values.append("), "); } + values.delete(values.length() - 2, values.length()); - values.append("), "); + // 完成 SQL 插入语句 + String sql = "INSERT INTO " + tableName + " " + columns.toString() + " " + values.toString() + ")"; + log.info("执行的sql添加的sql语句: {}", sql); + return sql; + } + + private String formatValue(DataType type, Object value) { + if (type == DataType.VARCHAR || type == DataType.TEXT) { + return "'" + value.toString().replace("'", "''") + "'"; + } else if (type == DataType.BIGINT) { + return value.toString(); + } else if (type == DataType.INT) { + return value.toString(); + } else if (type == DataType.DECIMAL) { + return value.toString(); + } else if (type == DataType.DATETIME) { + return "'" + new java.sql.Date(((Date) value).getTime()) + "'"; + } else if (type == DataType.DOUBLE) { + return value.toString(); + } else { + // 其他类型的处理 + return "'" + value.toString() + "'"; + } + } + + private List splitData(DataValue[][] listList, int batchSize) { + List batches = new ArrayList<>(); + int totalSize = listList.length; + int numBatches = (int) Math.ceil((double) totalSize / batchSize); + + for (int i = 0; i < numBatches; i++) { + int start = i * batchSize; + int end = Math.min(start + batchSize, totalSize); + DataValue[][] batch = new DataValue[end - start][]; + System.arraycopy(listList, start, batch, 0, end - start); + batches.add(batch); + } + + log.info(""); + return batches; + } + + private static void close(HikariDataSource dataSource) { + if (dataSource != null) { + dataSource.close(); + } } - values.delete(values.length() - 2, values.length()); - log.info("值++++++++++++++++++++++++++++++++++++++++++++++++++++++++"+values.toString()); - // 完成 SQL 插入语句 - String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString(); - log.info("执行的sql添加的sql语句"+sql); - return sql; - } - - private String formatValue(DataType type, Object value) { - - if (type == DataType.VARCHAR || type == DataType.TEXT) { - - return "'" + value.toString().replace("'", "''") + "'"; - } else if (type == DataType.BIGINT) { - - return value.toString(); - } else if (type == DataType.INT) { - - return value.toString(); - } else if (type == DataType.DECIMAL) { - - return value.toString(); - } else if (type == DataType.DATETIME) { - - return "'" + new java.sql.Date(((Date) value).getTime()) + "'"; - } else if (type == DataType.DOUBLE) { - - return value.toString(); - } else { - // 其他类型的处理 - - return "'" + value.toString() + "'"; - } - - } - - private List splitData(DataValue[][] listList, int batchSize) { - List batches = new ArrayList<>(); - int totalSize = listList.length; - int numBatches = (int) Math.ceil((double) totalSize / batchSize); - - for (int i = 0; i < numBatches; i++) { - int start = i * batchSize; - int end = Math.min(start + batchSize, totalSize); - DataValue[][] batch = new DataValue[end - start][]; - System.arraycopy(listList, start, batch, 0, end - start); - batches.add(batch); - } - - log.info(""); - return batches; - } - - private static void close(HikariDataSource dataSource) { - if (dataSource != null) { - dataSource.close(); - } - }