diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java index d8fb8b5..ad2b966 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java @@ -7,15 +7,18 @@ import com.muyu.cloud.etl.service.TableInfoService; import com.muyu.domain.DataValue; import com.muyu.domain.Source; import com.muyu.domain.TableInfo; +import com.muyu.domain.enums.DataType; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -29,55 +32,55 @@ public class ProductServiceImpl implements ProductService { @Autowired private TableInfoService tableInfoService; - public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { -// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); -// Long basicId1 = tableInfoDataSources.getBasicId(); - Source dataSources = sourceService.getById(basicId); - TableInfo tableInfo = tableInfoService.getById(tableId); - String tableName = tableInfo.getTableName(); - -// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); - - HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); - - ExecutorService executorService = Executors.newFixedThreadPool(4); - - AtomicInteger addCount = new AtomicInteger(); - - Connection connection = null; - - try { - connection = hikariDataSource.getConnection(); - // 遍历外部列表 - for (DataValue[] dataValueList : listList) { - Connection finalConnection = connection; - executorService.submit(() -> { - try { - addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); - } catch (SQLException e) { - // 记录异常 - e.printStackTrace(); - } - }); - } - - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.HOURS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Thread interrupted", e); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - closeConnection(connection); - } - - return addCount.get(); - } - - - -// private int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { +// +// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +//// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); +//// Long basicId1 = tableInfoDataSources.getBasicId(); +// Source dataSources = sourceService.getById(basicId); +// TableInfo tableInfo = tableInfoService.getById(tableId); +// String tableName = tableInfo.getTableName(); +// +//// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); +// +// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); +// +// ExecutorService executorService = Executors.newFixedThreadPool(4); +// +// AtomicInteger addCount = new AtomicInteger(); +// +// Connection connection = null; +// +// try { +// connection = hikariDataSource.getConnection(); +// // 遍历外部列表 +// for (DataValue[] dataValueList : listList) { +// Connection finalConnection = connection; +// executorService.submit(() -> { +// try { +// addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); +// } catch (SQLException e) { +// // 记录异常 +// e.printStackTrace(); +// } +// }); +// } +// +// executorService.shutdown(); +// executorService.awaitTermination(1, TimeUnit.HOURS); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// throw new RuntimeException("Thread interrupted", e); +// } catch (SQLException e) { +// throw new RuntimeException(e); +// } finally { +// closeConnection(connection); +// } +// +// return addCount.get(); +// } +// +// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { +// // // 获取当前行的所有字段名 // StringBuilder columns = new StringBuilder("("); // StringBuilder values = new StringBuilder("VALUES ("); @@ -93,147 +96,232 @@ public class ProductServiceImpl implements ProductService { // values.delete(values.length() - 2, values.length()); // // // 完成 SQL 插入语句 -// String sql = "INSERT INTO " + tableName + " (" + columns.toString() + ") " + values.toString(); +// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; +// log.info("添加语句{}",sql); // // int addCount = 0; -// try (PreparedStatement ps = conn.prepareStatement(sql)) { -// // 设置参数 -// int index = 1; -// for (DataValue dataValue : dataValueList) { -// Object value = dataValue.getValue(); -// if (value instanceof String) { -// ps.setString(index, (String) value); -// } else if (value instanceof Integer) { -// ps.setInt(index, (Integer) value); -// } else if (value instanceof Double) { -// ps.setDouble(index, (Double) value); -// } else if (value instanceof Date) { -// ps.setDate(index, new java.sql.Date(((Date) value).getTime())); -// } else { -// // 其他类型的处理 -// ps.setObject(index, value); +// +// try { +// conn.setAutoCommit(false); +// +// try ( +// +// PreparedStatement ps = conn.prepareStatement(sql)) { +// +// +// // 循环设置参数并执行插入 +// for (DataValue dataValue : dataValueList) { +// int index = 1; +// for (DataValue value : dataValueList) { +// Object obj = value.getValue(); +// if (obj instanceof String) { +// ps.setString(index++, (String) obj); +// log.info("类型为String,值{}",obj); +// } else if (obj instanceof Integer) { +// ps.setInt(index++, (Integer) obj); +// log.info("类型为Integer,值{}",obj); +// } else if (obj instanceof Double) { +// ps.setDouble(index++, (Double) obj); +// log.info("类型为Double,值{}",obj); +// } else if (obj instanceof Date) { +// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); +// log.info("类型为Date,值{}",obj); +// } else if (obj instanceof Boolean) { +// ps.setBoolean(index++, (Boolean) obj); +// log.info("类型为Boolean,值{}",obj); +// } else if (obj instanceof Float) { +// ps.setFloat(index++, (Float) obj); +// log.info("类型为Float,值{}",obj); +// } else if (obj instanceof Long) { +// ps.setLong(index++, (Long) obj); +// log.info("类型为Long,值{}",obj); +// } else { +// ps.setObject(index++, obj); +// log.info("类型为Object,值{}",obj); +// } +// } +// ps.addBatch(); // } -// index++; +// +// // 执行批量插入操作 +// int[] ints = ps.executeBatch(); +// +// for (int anInt : ints) { +// log.info("插入成功的数据有"+anInt); +// } +// +// conn.commit(); +// } catch (SQLException e) { +// e.printStackTrace(); // } -// // 执行插入操作 -// addCount = ps.executeUpdate(); +// } catch (SQLException e) { +// throw new RuntimeException(e); // } +// +// // return addCount; // } - - - public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { - - // 获取当前行的所有字段名 - StringBuilder columns = new StringBuilder("("); - StringBuilder values = new StringBuilder("VALUES ("); - - // 构建字段名和占位符 - for (DataValue dataValue : dataValueList) { - String key = dataValue.getKey(); - columns.append(key).append(", "); - values.append("?, "); - } - // 删除最后一个逗号和空格 - columns.delete(columns.length() - 2, columns.length()); - values.delete(values.length() - 2, values.length()); - - // 完成 SQL 插入语句 - String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; - log.info("添加语句{}",sql); - - int addCount = 0; - - try { - conn.setAutoCommit(false); - - try ( - - PreparedStatement ps = conn.prepareStatement(sql)) { - - - // 循环设置参数并执行插入 - for (DataValue dataValue : dataValueList) { - int index = 1; - for (DataValue value : dataValueList) { - Object obj = value.getValue(); - if (obj instanceof String) { - ps.setString(index++, (String) obj); - log.info("类型为String,值{}",obj); - } else if (obj instanceof Integer) { - ps.setInt(index++, (Integer) obj); - log.info("类型为Integer,值{}",obj); - } else if (obj instanceof Double) { - ps.setDouble(index++, (Double) obj); - log.info("类型为Double,值{}",obj); - } else if (obj instanceof Date) { - ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); - log.info("类型为Date,值{}",obj); - } else if (obj instanceof Boolean) { - ps.setBoolean(index++, (Boolean) obj); - log.info("类型为Boolean,值{}",obj); - } else if (obj instanceof Float) { - ps.setFloat(index++, (Float) obj); - log.info("类型为Float,值{}",obj); - } else if (obj instanceof Long) { - ps.setLong(index++, (Long) obj); - log.info("类型为Long,值{}",obj); - } else { - ps.setObject(index++, obj); - log.info("类型为Object,值{}",obj); - } - } - ps.addBatch(); - } - - // 执行批量插入操作 - int[] ints = ps.executeBatch(); - - for (int anInt : ints) { - log.info("插入成功的数据有"+anInt); - } - - conn.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - - - return addCount; - } - - - private void closeConnection(Connection conn) { - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - } - - -// public static HikariDataSource instance = null; -// @NotNull -// public static synchronized HikariDataSource getHikariDataSource(Source dataSources) { -// if(instance == null) { -// HikariConfig hikariConfig = new HikariConfig(); -// hikariConfig.setPoolName("HikariCP 连接池"); -// hikariConfig.setDataSourceClassName(dataSources.getDriverName()); -// hikariConfig.addDataSourceProperty("user", dataSources.getUsername()); -// hikariConfig.addDataSourceProperty("password", dataSources.getPassword()); -// hikariConfig.addDataSourceProperty("url", dataSources.getUrl()); -// hikariConfig.setMaximumPoolSize(15); // -// instance = new HikariDataSource(hikariConfig); +// +// private void closeConnection(Connection conn) { +// if (conn != null) { +// try { +// conn.close(); +// } catch (SQLException e) { +// e.printStackTrace(); +// } // } -// return instance; // } + @Override + public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); +// Long basicId1 = tableInfoDataSources.getBasicId(); + + Source dataSources = sourceService.getById(basicId); + TableInfo tableInfo = tableInfoService.getById(tableId); + String tableName = tableInfo.getTableName(); + + HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); + + ExecutorService executorService = Executors.newFixedThreadPool(8); + AtomicInteger addCount = new AtomicInteger(); + + // 分割数据为3000个批次 + List batches = splitData(listList, 2000); + + try (Connection conn = hikariDataSource.getConnection()) { + conn.setAutoCommit(false); // 开启事务 + + for (DataValue[][] batch : batches) { + executorService.submit(() -> { + try (Statement stmt = conn.createStatement()) { + log.info("sql开始拼接"); + String sql = buildBatchInsertSQL(tableName, batch); + log.info("sql拼接结束: {}", sql); + stmt.executeUpdate(sql); + addCount.addAndGet(batch.length); + } catch (SQLException e) { + log.error("SQLException异常发生", e); + try { + conn.rollback(); // 回滚事务 + } catch (SQLException ex) { + log.error("回滚事务失败", ex); + throw new RuntimeException(ex); + } + throw new RuntimeException(e); + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.HOURS); + + conn.commit(); // 提交事务 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + close(hikariDataSource); // 关闭数据源 + } + + return addCount.get(); + } + + private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { + log.info("执行到1"); + + StringBuilder columns = new StringBuilder("("); + StringBuilder values = new StringBuilder("VALUES "); + + // 构建字段名 + for (DataValue dataValue : batch[0]) { + String key = dataValue.getKey(); + columns.append(key).append(", "); + } + log.info("执行到2"); + // 删除最后一个逗号和空格 + columns.delete(columns.length() - 2, columns.length()); + log.info("执行到3"); + + // 构建值部分 + for (DataValue[] dataValueList : batch) { + values.append("("); + log.info("执行到3.0"); + for (DataValue dataValue : dataValueList) { + Object value = dataValue.getValue(); + log.info("执行到3.00"); + values.append(formatValue(dataValue.getType(), value)).append(", "); + log.info("执行到3.9"); + } + log.info("执行到4"); + values.delete(values.length() - 2, values.length()); + values.append("), "); + log.info("执行到5"); + } + log.info("执行到6"); + log.info("当前执行到6的sql语句:"+values.toString()); + values.delete(values.length() - 2, values.length()); + log.info("执行到7"); + log.info("当前执行到7的sql语句:"+values.toString()); + // 完成 SQL 插入语句 + String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString(); + log.info("批量添加的sql: {}", sql); + return sql; + } + + private String formatValue(DataType type, Object value) { + log.info("执行到3.1"); + if (type == DataType.VARCHAR || type == DataType.TEXT) { + log.info("执行到3.2"); + return "'" + value.toString().replace("'", "''") + "'"; + } else if (type == DataType.BIGINT) { + log.info("执行到3.3"); + return value.toString(); + } else if (type == DataType.INT) { + log.info("执行到3.4"); + return value.toString(); + } else if (type == DataType.DECIMAL) { + log.info("执行到3.5"); + return value.toString(); + } else if (type == DataType.DATETIME) { + log.info("执行到3.6"); + return "'" + new java.sql.Date(((Date) value).getTime()) + "'"; + } else if (type == DataType.DOUBLE) { + log.info("执行到3.7"); + return value.toString(); + } else { + // 其他类型的处理 + log.info("执行到3.8"); + return "'" + value.toString() + "'"; + } + + } + + private List splitData(DataValue[][] listList, int batchSize) { + List batches = new ArrayList<>(); + int totalSize = listList.length; + int numBatches = (int) Math.ceil((double) totalSize / batchSize); + + for (int i = 0; i < numBatches; i++) { + int start = i * batchSize; + int end = Math.min(start + batchSize, totalSize); + DataValue[][] batch = new DataValue[end - start][]; + System.arraycopy(listList, start, batch, 0, end - start); + batches.add(batch); + } + + return batches; + } + + private static void close(HikariDataSource dataSource) { + if (dataSource != null) { + dataSource.close(); + } + } + }