From b671ffbd69b62fec2c52aad2332522f12185d978 Mon Sep 17 00:00:00 2001 From: lwj <3529558005@qq.com> Date: Sun, 8 Sep 2024 23:07:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B7=BB=E5=8A=A0,=E7=AC=AC?= =?UTF-8?q?=E4=B8=80=E7=A7=8D=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../etl/service/impl/ProductServiceImpl.java | 564 ++++++++---------- 1 file changed, 265 insertions(+), 299 deletions(-) diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java index 1c3ed81..9580829 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java @@ -1,13 +1,12 @@ package com.muyu.cloud.etl.service.impl; +import com.muyu.Hikari.HikariPool; import com.muyu.cloud.etl.service.ProductService; import com.muyu.cloud.etl.service.SourceService; import com.muyu.cloud.etl.service.TableInfoService; import com.muyu.domain.DataValue; import com.muyu.domain.Source; import com.muyu.domain.TableInfo; -import com.muyu.domain.enums.DataType; -import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -16,9 +15,7 @@ import org.springframework.stereotype.Service; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Date; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -33,141 +30,141 @@ public class ProductServiceImpl implements ProductService { private TableInfoService tableInfoService; -// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { -//// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); -//// Long basicId1 = tableInfoDataSources.getBasicId(); -// Source dataSources = sourceService.getById(basicId); -// TableInfo tableInfo = tableInfoService.getById(tableId); -// String tableName = tableInfo.getTableName(); -// -//// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); -// -// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); -// -// ExecutorService executorService = Executors.newFixedThreadPool(4); -// -// AtomicInteger addCount = new AtomicInteger(); -// -// Connection connection = null; -// -// try { -// connection = hikariDataSource.getConnection(); -// // 遍历外部列表 -// for (DataValue[] dataValueList : listList) { -// Connection finalConnection = connection; -// executorService.submit(() -> { -// try { -// addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); -// } catch (SQLException e) { -// // 记录异常 -// e.printStackTrace(); -// } -// }); -// } -// -// executorService.shutdown(); -// executorService.awaitTermination(1, TimeUnit.HOURS); -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// throw new RuntimeException("Thread interrupted", e); -// } catch (SQLException e) { -// throw new RuntimeException(e); -// } finally { -// closeConnection(connection); -// } -// -// return addCount.get(); -// } -// -// -// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { -// -// // 获取当前行的所有字段名 -// StringBuilder columns = new StringBuilder("("); -// StringBuilder values = new StringBuilder("VALUES ("); -// -// // 构建字段名和占位符 -// for (DataValue dataValue : dataValueList) { -// String key = dataValue.getKey(); -// columns.append(key).append(", "); -// values.append("?, "); -// } -// // 删除最后一个逗号和空格 -// columns.delete(columns.length() - 2, columns.length()); -// values.delete(values.length() - 2, values.length()); -// -// // 完成 SQL 插入语句 -// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; -// log.info("添加语句{}",sql); -// -// int addCount = 0; -// -// try { -// conn.setAutoCommit(false); -// try ( -// PreparedStatement ps = conn.prepareStatement(sql)) { -// // 循环设置参数并执行插入 -//// for (DataValue dataValue : dataValueList) { -// int index = 1; -// for (DataValue value : dataValueList) { -// Object obj = value.getValue(); -// if (obj instanceof String) { -// ps.setString(index++, (String) obj); -// log.info("类型为String,值{}",obj); -// } else if (obj instanceof Integer) { -// ps.setInt(index++, (Integer) obj); -// log.info("类型为Integer,值{}",obj); -// } else if (obj instanceof Double) { -// ps.setDouble(index++, (Double) obj); -// log.info("类型为Double,值{}",obj); -// } else if (obj instanceof Date) { -// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); -// log.info("类型为Date,值{}",obj); -// } else if (obj instanceof Boolean) { -// ps.setBoolean(index++, (Boolean) obj); -// log.info("类型为Boolean,值{}",obj); -// } else if (obj instanceof Float) { -// ps.setFloat(index++, (Float) obj); -// log.info("类型为Float,值{}",obj); -// } else if (obj instanceof Long) { -// ps.setLong(index++, (Long) obj); -// log.info("类型为Long,值{}",obj); -// } else { -// ps.setObject(index++, obj); -// log.info("类型为OOO,值{}",obj); -// } -// } -// ps.addBatch(); -//// } -// -// // 执行批量插入操作 -// int[] ints = ps.executeBatch(); -// -// for (int anInt : ints) { -// log.info("插入成功的数据有"+anInt); + public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); +// Long basicId1 = tableInfoDataSources.getBasicId(); + Source dataSources = sourceService.getById(basicId); + TableInfo tableInfo = tableInfoService.getById(tableId); + String tableName = tableInfo.getTableName(); + +// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); + + HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); + + ExecutorService executorService = Executors.newFixedThreadPool(4); + + AtomicInteger addCount = new AtomicInteger(); + + Connection connection = null; + + try { + connection = hikariDataSource.getConnection(); + // 遍历外部列表 + for (DataValue[] dataValueList : listList) { + Connection finalConnection = connection; + executorService.submit(() -> { + try { + addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); + } catch (SQLException e) { + // 记录异常 + e.printStackTrace(); + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.HOURS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted", e); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + closeConnection(connection); + } + + return addCount.get(); + } + + + public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { + + // 获取当前行的所有字段名 + StringBuilder columns = new StringBuilder("("); + StringBuilder values = new StringBuilder("VALUES ("); + + // 构建字段名和占位符 + for (DataValue dataValue : dataValueList) { + String key = dataValue.getKey(); + columns.append(key).append(", "); + values.append("?, "); + } + // 删除最后一个逗号和空格 + columns.delete(columns.length() - 2, columns.length()); + values.delete(values.length() - 2, values.length()); + + // 完成 SQL 插入语句 + String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; + log.info("添加语句{}",sql); + + int addCount = 0; + + try { + conn.setAutoCommit(false); + try ( + PreparedStatement ps = conn.prepareStatement(sql)) { + // 循环设置参数并执行插入 +// for (DataValue dataValue : dataValueList) { + int index = 1; + for (DataValue value : dataValueList) { + Object obj = value.getValue(); + if (obj instanceof String) { + ps.setString(index++, (String) obj); + log.info("类型为String,值{}",obj); + } else if (obj instanceof Integer) { + ps.setInt(index++, (Integer) obj); + log.info("类型为Integer,值{}",obj); + } else if (obj instanceof Double) { + ps.setDouble(index++, (Double) obj); + log.info("类型为Double,值{}",obj); + } else if (obj instanceof Date) { + ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); + log.info("类型为Date,值{}",obj); + } else if (obj instanceof Boolean) { + ps.setBoolean(index++, (Boolean) obj); + log.info("类型为Boolean,值{}",obj); + } else if (obj instanceof Float) { + ps.setFloat(index++, (Float) obj); + log.info("类型为Float,值{}",obj); + } else if (obj instanceof Long) { + ps.setLong(index++, (Long) obj); + log.info("类型为Long,值{}",obj); + } else { + ps.setObject(index++, obj); + log.info("类型为OOO,值{}",obj); + } + } + ps.addBatch(); // } -// -// conn.commit(); -// } catch (SQLException e) { -// e.printStackTrace(); -// } -// } catch (SQLException e) { -// throw new RuntimeException(e); -// } -// -// return addCount; -// } -// -// -// private void closeConnection(Connection conn) { -// if (conn != null) { -// try { -// conn.close(); -// } catch (SQLException e) { -// e.printStackTrace(); -// } -// } -// } + + // 执行批量插入操作 + int[] ints = ps.executeBatch(); + + for (int anInt : ints) { + log.info("插入成功的数据有"+anInt); + } + + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return addCount; + } + + + private void closeConnection(Connection conn) { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } // 第二种 // @Override @@ -344,86 +341,86 @@ public class ProductServiceImpl implements ProductService { //第三种 - - public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { - - log.info("数据库主键ID"+basicId); - log.info("表的主键ID"+tableId); - Source dataSources = sourceService.getById(basicId); - TableInfo tableInfo = tableInfoService.getById(tableId); - String tableName = tableInfo.getTableName(); - - HikariConfig hikariConfig = new HikariConfig(); - hikariConfig.setPoolName("HikariCP 连接池"); - hikariConfig.setDriverClassName(dataSources.getDriverName()); - hikariConfig.setJdbcUrl(dataSources.getUrl()); - hikariConfig.setUsername(dataSources.getUsername()); - hikariConfig.setPassword(dataSources.getPassword()); - hikariConfig.setMinimumIdle(2); - hikariConfig.setMaximumPoolSize(10); - - HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); - - log.info("数据源ID的basicId值: {}", basicId); - log.info("表的主键值: {}", tableId); - for (DataValue[] dataValues : listList) { - for (DataValue dataValue : dataValues) { - log.info("里面的所有的值: {}", dataValue); - } - } - - ExecutorService executorService = Executors.newFixedThreadPool(8); - AtomicInteger addCount = new AtomicInteger(); - - // 分割数据为3000个批次 - List batches = splitData(listList, 20); - - try (Connection conn = hikariDataSource.getConnection()) { - conn.setAutoCommit(false); // 开启事务 - - for (DataValue[][] batch : batches) { - log.info("数量batch: {}", batch.length); - - executorService.submit(() -> { - try (PreparedStatement stmt = conn.prepareStatement(buildBatchInsertSQL(tableName, batch))) { - int index = 1; - for (DataValue[] dataValues : batch) { - for (DataValue dataValue : dataValues) { - stmt.setObject(index++, dataValue.getValue()); - } - stmt.addBatch(); - } - stmt.executeBatch(); - addCount.addAndGet(batch.length); - } catch (SQLException e) { - log.error("SQLException异常发生", e); - try { - conn.rollback(); // 回滚事务 - } catch (SQLException ex) { - log.error("回滚事务失败", ex); - throw new RuntimeException(ex); - } - throw new RuntimeException(e); - } - }); - } - - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.HOURS); - conn.commit(); // 提交事务 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - close(hikariDataSource); // 关闭数据源 - } - - return addCount.get(); - } - - private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { +// +// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +// +// log.info("数据库主键ID"+basicId); +// log.info("表的主键ID"+tableId); +// Source dataSources = sourceService.getById(basicId); +// TableInfo tableInfo = tableInfoService.getById(tableId); +// String tableName = tableInfo.getTableName(); +// +// HikariConfig hikariConfig = new HikariConfig(); +// hikariConfig.setPoolName("HikariCP 连接池"); +// hikariConfig.setDriverClassName(dataSources.getDriverName()); +// hikariConfig.setJdbcUrl(dataSources.getUrl()); +// hikariConfig.setUsername(dataSources.getUsername()); +// hikariConfig.setPassword(dataSources.getPassword()); +// hikariConfig.setMinimumIdle(2); +// hikariConfig.setMaximumPoolSize(10); +// +// HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); +// +// log.info("数据源ID的basicId值: {}", basicId); +// log.info("表的主键值: {}", tableId); +// for (DataValue[] dataValues : listList) { +// for (DataValue dataValue : dataValues) { +// log.info("里面的所有的值: {}", dataValue); +// } +// } +// +// ExecutorService executorService = Executors.newFixedThreadPool(8); +// AtomicInteger addCount = new AtomicInteger(); +// +// // 分割数据为3000个批次 +// List batches = splitData(listList, 20); +// +// try (Connection conn = hikariDataSource.getConnection()) { +// conn.setAutoCommit(false); // 开启事务 +// +// for (DataValue[][] batch : batches) { +// log.info("数量batch: {}", batch.length); +// +// executorService.submit(() -> { +// try (PreparedStatement stmt = conn.prepareStatement(buildBatchInsertSQL(tableName, batch))) { +// int index = 1; +// for (DataValue[] dataValues : batch) { +// for (DataValue dataValue : dataValues) { +// stmt.setObject(index++, dataValue.getValue()); +// } +// stmt.addBatch(); +// } +// stmt.executeBatch(); +// addCount.addAndGet(batch.length); +// } catch (SQLException e) { +// log.error("SQLException异常发生", e); +// try { +// conn.rollback(); // 回滚事务 +// } catch (SQLException ex) { +// log.error("回滚事务失败", ex); +// throw new RuntimeException(ex); +// } +// throw new RuntimeException(e); +// } +// }); +// } +// +// executorService.shutdown(); +// executorService.awaitTermination(1, TimeUnit.HOURS); +// conn.commit(); // 提交事务 +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// throw new RuntimeException(e); +// } catch (SQLException e) { +// throw new RuntimeException(e); +// } finally { +// close(hikariDataSource); // 关闭数据源 +// } +// +// return addCount.get(); +// } +// +// private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { // StringBuilder columns = new StringBuilder("("); // StringBuilder values = new StringBuilder("VALUES "); // @@ -435,98 +432,67 @@ public class ProductServiceImpl implements ProductService { // // // 删除最后一个逗号和空格 // columns.delete(columns.length() - 2, columns.length()); -// // // 构建值部分 // for (DataValue[] dataValueList : batch) { // values.append("("); -// for (DataValue dataValue : dataValueList) { -// Object value = dataValue.getValue(); -// values.append("?, "); // 使用占位符 +// for (int i = 0; i < dataValueList.length; i++) { +// if (i > 0) { +// values.append(", "); +// } +// values.append("?"); // 使用占位符 // } -// values.delete(values.length() - 2, values.length()); // values.append("), "); // } // // values.delete(values.length() - 2, values.length()); // // 完成 SQL 插入语句 -// String sql = "INSERT INTO " + tableName + " " + columns.toString() + " )" + values.toString() + ");"; +// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ";"; +// // log.info("执行的sql添加的sql语句: {}", sql); // return sql; - - - - StringBuilder columns = new StringBuilder("("); - StringBuilder values = new StringBuilder("VALUES "); - - // 构建字段名 - for (DataValue dataValue : batch[0]) { - String key = dataValue.getKey(); - columns.append(key).append(", "); - } - - // 删除最后一个逗号和空格 - columns.delete(columns.length() - 2, columns.length()); - - // 构建值部分 - for (DataValue[] dataValueList : batch) { - values.append("("); - for (int i = 0; i < dataValueList.length; i++) { - if (i > 0) { - values.append(", "); - } - values.append("?"); // 使用占位符 - } - values.append("), "); - } - - values.delete(values.length() - 2, values.length()); - // 完成 SQL 插入语句 - String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ""; - log.info("执行的sql添加的sql语句: {}", sql); - return sql; - } - - private String formatValue(DataType type, Object value) { - if (type == DataType.VARCHAR || type == DataType.TEXT) { - return "'" + value.toString().replace("'", "''") + "'"; - } else if (type == DataType.BIGINT) { - return value.toString(); - } else if (type == DataType.INT) { - return value.toString(); - } else if (type == DataType.DECIMAL) { - return value.toString(); - } else if (type == DataType.DATETIME) { - return "'" + new java.sql.Date(((Date) value).getTime()) + "'"; - } else if (type == DataType.DOUBLE) { - return value.toString(); - } else { - // 其他类型的处理 - return "'" + value.toString() + "'"; - } - } - - private List splitData(DataValue[][] listList, int batchSize) { - List batches = new ArrayList<>(); - int totalSize = listList.length; - int numBatches = (int) Math.ceil((double) totalSize / batchSize); - - for (int i = 0; i < numBatches; i++) { - int start = i * batchSize; - int end = Math.min(start + batchSize, totalSize); - DataValue[][] batch = new DataValue[end - start][]; - System.arraycopy(listList, start, batch, 0, end - start); - batches.add(batch); - } - - log.info(""); - return batches; - } - - private static void close(HikariDataSource dataSource) { - if (dataSource != null) { - dataSource.close(); - } - } +// } +// +// private String formatValue(DataType type, Object value) { +// if (type == DataType.VARCHAR || type == DataType.TEXT) { +// return "'" + value.toString().replace("'", "''") + "'"; +// } else if (type == DataType.BIGINT) { +// return value.toString(); +// } else if (type == DataType.INT) { +// return value.toString(); +// } else if (type == DataType.DECIMAL) { +// return value.toString(); +// } else if (type == DataType.DATETIME) { +// return "'" + new java.sql.Date(((Date) value).getTime()) + "'"; +// } else if (type == DataType.DOUBLE) { +// return value.toString(); +// } else { +// // 其他类型的处理 +// return "'" + value.toString() + "'"; +// } +// } +// +// private List splitData(DataValue[][] listList, int batchSize) { +// List batches = new ArrayList<>(); +// int totalSize = listList.length; +// int numBatches = (int) Math.ceil((double) totalSize / batchSize); +// +// for (int i = 0; i < numBatches; i++) { +// int start = i * batchSize; +// int end = Math.min(start + batchSize, totalSize); +// DataValue[][] batch = new DataValue[end - start][]; +// System.arraycopy(listList, start, batch, 0, end - start); +// batches.add(batch); +// } +// +// log.info(""); +// return batches; +// } +// +// private static void close(HikariDataSource dataSource) { +// if (dataSource != null) { +// dataSource.close(); +// } +// }