From cda6d246c6f28f82fc37bc196bee015f79f7a074 Mon Sep 17 00:00:00 2001 From: lwj <3529558005@qq.com> Date: Sun, 8 Sep 2024 12:53:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E6=9B=B4=E6=96=B0=E8=BF=94?= =?UTF-8?q?=E5=9B=9E=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../etl/service/impl/ProductServiceImpl.java | 448 +++++++----------- 1 file changed, 178 insertions(+), 270 deletions(-) diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java index e791c5f..19d2d7c 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java @@ -7,18 +7,15 @@ import com.muyu.cloud.etl.service.TableInfoService; import com.muyu.domain.DataValue; import com.muyu.domain.Source; import com.muyu.domain.TableInfo; -import com.muyu.domain.enums.DataType; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; import java.util.Date; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -32,55 +29,55 @@ public class ProductServiceImpl implements ProductService { @Autowired private TableInfoService tableInfoService; -// -// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { -//// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); -//// Long basicId1 = tableInfoDataSources.getBasicId(); -// Source dataSources = sourceService.getById(basicId); -// TableInfo tableInfo = tableInfoService.getById(tableId); -// String tableName = tableInfo.getTableName(); -// -//// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); -// -// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); -// -// ExecutorService executorService = Executors.newFixedThreadPool(4); -// -// AtomicInteger addCount = new AtomicInteger(); -// -// Connection connection = null; -// -// try { -// connection = hikariDataSource.getConnection(); -// // 遍历外部列表 -// for (DataValue[] dataValueList : listList) { -// Connection finalConnection = connection; -// executorService.submit(() -> { -// try { -// addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); -// } catch (SQLException e) { -// // 记录异常 -// e.printStackTrace(); -// } -// }); -// } -// -// executorService.shutdown(); -// executorService.awaitTermination(1, TimeUnit.HOURS); -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// throw new RuntimeException("Thread interrupted", e); -// } catch (SQLException e) { -// throw new RuntimeException(e); -// } finally { -// closeConnection(connection); -// } -// -// return addCount.get(); -// } -// -// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { -// + public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); +// Long basicId1 = tableInfoDataSources.getBasicId(); + Source dataSources = sourceService.getById(basicId); + TableInfo tableInfo = tableInfoService.getById(tableId); + String tableName = tableInfo.getTableName(); + +// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); + + HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); + + ExecutorService executorService = Executors.newFixedThreadPool(4); + + AtomicInteger addCount = new AtomicInteger(); + + Connection connection = null; + + try { + connection = hikariDataSource.getConnection(); + // 遍历外部列表 + for (DataValue[] dataValueList : listList) { + Connection finalConnection = connection; + executorService.submit(() -> { + try { + addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); + } catch (SQLException e) { + // 记录异常 + e.printStackTrace(); + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.HOURS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread interrupted", e); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + closeConnection(connection); + } + + return addCount.get(); + } + + + +// private int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { // // 获取当前行的所有字段名 // StringBuilder columns = new StringBuilder("("); // StringBuilder values = new StringBuilder("VALUES ("); @@ -96,234 +93,145 @@ public class ProductServiceImpl implements ProductService { // values.delete(values.length() - 2, values.length()); // // // 完成 SQL 插入语句 -// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; -// log.info("添加语句{}",sql); +// String sql = "INSERT INTO " + tableName + " (" + columns.toString() + ") " + values.toString(); // // int addCount = 0; -// -// try { -// conn.setAutoCommit(false); -// -// try ( -// -// PreparedStatement ps = conn.prepareStatement(sql)) { -// -// -// // 循环设置参数并执行插入 -// for (DataValue dataValue : dataValueList) { -// int index = 1; -// for (DataValue value : dataValueList) { -// Object obj = value.getValue(); -// if (obj instanceof String) { -// ps.setString(index++, (String) obj); -// log.info("类型为String,值{}",obj); -// } else if (obj instanceof Integer) { -// ps.setInt(index++, (Integer) obj); -// log.info("类型为Integer,值{}",obj); -// } else if (obj instanceof Double) { -// ps.setDouble(index++, (Double) obj); -// log.info("类型为Double,值{}",obj); -// } else if (obj instanceof Date) { -// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); -// log.info("类型为Date,值{}",obj); -// } else if (obj instanceof Boolean) { -// ps.setBoolean(index++, (Boolean) obj); -// log.info("类型为Boolean,值{}",obj); -// } else if (obj instanceof Float) { -// ps.setFloat(index++, (Float) obj); -// log.info("类型为Float,值{}",obj); -// } else if (obj instanceof Long) { -// ps.setLong(index++, (Long) obj); -// log.info("类型为Long,值{}",obj); -// } else { -// ps.setObject(index++, obj); -// log.info("类型为Object,值{}",obj); -// } -// } -// ps.addBatch(); +// try (PreparedStatement ps = conn.prepareStatement(sql)) { +// // 设置参数 +// int index = 1; +// for (DataValue dataValue : dataValueList) { +// Object value = dataValue.getValue(); +// if (value instanceof String) { +// ps.setString(index, (String) value); +// } else if (value instanceof Integer) { +// ps.setInt(index, (Integer) value); +// } else if (value instanceof Double) { +// ps.setDouble(index, (Double) value); +// } else if (value instanceof Date) { +// ps.setDate(index, new java.sql.Date(((Date) value).getTime())); +// } else { +// // 其他类型的处理 +// ps.setObject(index, value); // } -// -// // 执行批量插入操作 -// int[] ints = ps.executeBatch(); -// -// for (int anInt : ints) { -// log.info("插入成功的数据有"+anInt); -// } -// -// conn.commit(); -// } catch (SQLException e) { -// e.printStackTrace(); +// index++; // } -// } catch (SQLException e) { -// throw new RuntimeException(e); +// // 执行插入操作 +// addCount = ps.executeUpdate(); // } -// -// // return addCount; // } + + + public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { + + // 获取当前行的所有字段名 + StringBuilder columns = new StringBuilder("("); + StringBuilder values = new StringBuilder("VALUES ("); + + // 构建字段名和占位符 + for (DataValue dataValue : dataValueList) { + String key = dataValue.getKey(); + columns.append(key).append(", "); + values.append("?, "); + } + // 删除最后一个逗号和空格 + columns.delete(columns.length() - 2, columns.length()); + values.delete(values.length() - 2, values.length()); + + // 完成 SQL 插入语句 + String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; + log.info("添加语句{}",sql); + + int addCount = 0; + + try { + conn.setAutoCommit(false); + + try ( + + PreparedStatement ps = conn.prepareStatement(sql)) { + + + // 循环设置参数并执行插入 + for (DataValue dataValue : dataValueList) { + int index = 1; + Object obj = dataValue.getValue(); + if (obj instanceof String) { + ps.setString(index++, (String) obj); + log.info("类型为String,值{}",obj); + } else if (obj instanceof Integer) { + ps.setInt(index++, (Integer) obj); + log.info("类型为Integer,值{}",obj); + } else if (obj instanceof Double) { + ps.setDouble(index++, (Double) obj); + log.info("类型为Double,值{}",obj); + } else if (obj instanceof Date) { + ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); + log.info("类型为Date,值{}",obj); + } else if (obj instanceof Boolean) { + ps.setBoolean(index++, (Boolean) obj); + log.info("类型为Boolean,值{}",obj); + } else if (obj instanceof Float) { + ps.setFloat(index++, (Float) obj); + log.info("类型为Float,值{}",obj); + } else if (obj instanceof Long) { + ps.setLong(index++, (Long) obj); + log.info("类型为Long,值{}",obj); + } else { + ps.setObject(index++, obj); + log.info("类型为obj,值{}",obj); + } + ps.addBatch(); + } + + // 执行批量插入操作 + int[] ints = ps.executeBatch(); + + for (int anInt : ints) { + log.info("插入成功的数据有"+anInt); + } + + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + + return addCount; + } + + + private void closeConnection(Connection conn) { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + + +// public static HikariDataSource instance = null; +// @NotNull +// public static synchronized HikariDataSource getHikariDataSource(Source dataSources) { +// if(instance == null) { +// HikariConfig hikariConfig = new HikariConfig(); +// hikariConfig.setPoolName("HikariCP 连接池"); +// hikariConfig.setDataSourceClassName(dataSources.getDriverName()); +// hikariConfig.addDataSourceProperty("user", dataSources.getUsername()); +// hikariConfig.addDataSourceProperty("password", dataSources.getPassword()); +// hikariConfig.addDataSourceProperty("url", dataSources.getUrl()); +// hikariConfig.setMaximumPoolSize(15); // -// -// private void closeConnection(Connection conn) { -// if (conn != null) { -// try { -// conn.close(); -// } catch (SQLException e) { -// e.printStackTrace(); -// } +// instance = new HikariDataSource(hikariConfig); // } +// return instance; // } - @Override - public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { -// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); -// Long basicId1 = tableInfoDataSources.getBasicId(); - - Source dataSources = sourceService.getById(basicId); - TableInfo tableInfo = tableInfoService.getById(tableId); - String tableName = tableInfo.getTableName(); - - HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); - - ExecutorService executorService = Executors.newFixedThreadPool(8); - AtomicInteger addCount = new AtomicInteger(); - - // 分割数据为3000个批次 - List batches = splitData(listList, 2000); - - try (Connection conn = hikariDataSource.getConnection()) { - conn.setAutoCommit(false); // 开启事务 - - for (DataValue[][] batch : batches) { - executorService.submit(() -> { - try (Statement stmt = conn.createStatement()) { - log.info("sql开始拼接"); - String sql = buildBatchInsertSQL(tableName, batch); - log.info("sql拼接结束: {}", sql); - stmt.executeUpdate(sql); - addCount.addAndGet(batch.length); - } catch (SQLException e) { - log.error("SQLException异常发生", e); - try { - conn.rollback(); // 回滚事务 - } catch (SQLException ex) { - log.error("回滚事务失败", ex); - throw new RuntimeException(ex); - } - throw new RuntimeException(e); - } - }); - } - - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.HOURS); - - conn.commit(); // 提交事务 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - close(hikariDataSource); // 关闭数据源 - } - - return addCount.get(); - } - - private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { - log.info("执行到1"); - - StringBuilder columns = new StringBuilder("("); - StringBuilder values = new StringBuilder("VALUES "); - - // 构建字段名 - for (DataValue dataValue : batch[0]) { - String key = dataValue.getKey(); - columns.append(key).append(", "); - } - log.info("执行到2"); - // 删除最后一个逗号和空格 - columns.delete(columns.length() - 2, columns.length()); - log.info("执行到3"); - - // 构建值部分 - for (DataValue[] dataValueList : batch) { - values.append("("); - log.info("执行到3.0"); - for (DataValue dataValue : dataValueList) { - Object value = dataValue.getValue(); - log.info("执行到3.00"); - String s = formatValue(dataValue.getType(), value); - log.info("拼接sql+=++++[{}",s); - values.append(s).append(", "); - log.info("执行到3.9"); - } - log.info("执行到4"); - values.delete(values.length() - 2, values.length()); - values.append("), "); - log.info("执行到5"); - } - log.info("执行到6"); - log.info("当前执行到6的sql语句:"+values.toString()); - values.delete(values.length() - 2, values.length()); - log.info("执行到7"); - log.info("当前执行到7的sql语句:"+values.toString()); - // 完成 SQL 插入语句 - String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString(); - log.info("批量添加的sql: {}", sql); - return sql; - } - - private String formatValue(DataType type, Object value) { - log.info("执行到3.1"); - if (type == DataType.VARCHAR || type == DataType.TEXT) { - log.info("执行到3.2"); - return "'" + value.toString().replace("'", "''") + "'"; - } else if (type == DataType.BIGINT) { - log.info("执行到3.3"); - return value.toString(); - } else if (type == DataType.INT) { - log.info("执行到3.4"); - return value.toString(); - } else if (type == DataType.DECIMAL) { - log.info("执行到3.5"); - return value.toString(); - } else if (type == DataType.DATETIME) { - log.info("执行到3.6"); - return "'" + new java.sql.Date(((Date) value).getTime()) + "'"; - } else if (type == DataType.DOUBLE) { - log.info("执行到3.7"); - return value.toString(); - } else { - // 其他类型的处理 - log.info("执行到3.8"); - return "'" + value.toString() + "'"; - } - - } - - private List splitData(DataValue[][] listList, int batchSize) { - List batches = new ArrayList<>(); - int totalSize = listList.length; - int numBatches = (int) Math.ceil((double) totalSize / batchSize); - - for (int i = 0; i < numBatches; i++) { - int start = i * batchSize; - int end = Math.min(start + batchSize, totalSize); - DataValue[][] batch = new DataValue[end - start][]; - System.arraycopy(listList, start, batch, 0, end - start); - batches.add(batch); - } - - return batches; - } - - private static void close(HikariDataSource dataSource) { - if (dataSource != null) { - dataSource.close(); - } - } - }