diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java index b8e484c..8ef6535 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java @@ -1,21 +1,26 @@ package com.muyu.cloud.etl.service.impl; -import com.muyu.Hikari.HikariPool; import com.muyu.cloud.etl.service.ProductService; import com.muyu.cloud.etl.service.SourceService; import com.muyu.cloud.etl.service.TableInfoService; import com.muyu.domain.DataValue; import com.muyu.domain.Source; import com.muyu.domain.TableInfo; +import com.muyu.domain.enums.DataType; +import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -30,136 +35,137 @@ public class ProductServiceImpl implements ProductService { private TableInfoService tableInfoService; - public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { - - Source dataSources = sourceService.getById(basicId); - TableInfo tableInfo = tableInfoService.getById(tableId); - String tableName = tableInfo.getTableName(); - - - - HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); - ExecutorService executorService = Executors.newFixedThreadPool(8); - - AtomicInteger addCount = new AtomicInteger(); - - Connection connection = null; - - try { - connection = hikariDataSource.getConnection(); - - for (DataValue[] dataValueList : listList) { - Connection finalConnection = connection; - executorService.submit(() -> { - try { - addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); - } catch (SQLException e) { - - e.printStackTrace(); - } - }); - } - - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.HOURS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Thread interrupted", e); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - closeConnection(connection); - } - - return addCount.get(); - } - - - public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { - - - StringBuilder columns = new StringBuilder("("); - StringBuilder values = new StringBuilder("VALUES ("); - - - for (DataValue dataValue : dataValueList) { - String key = dataValue.getKey(); - columns.append(key).append(", "); - values.append("?, "); - } - - columns.delete(columns.length() - 2, columns.length()); - values.delete(values.length() - 2, values.length()); - - - String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; - log.info("添加语句{}",sql); - - int addCount = 0; - - try { - conn.setAutoCommit(false); - try ( - PreparedStatement ps = conn.prepareStatement(sql)) { - - int index = 1; - for (DataValue value : dataValueList) { - Object obj = value.getValue(); - if (obj instanceof String) { - ps.setString(index++, (String) obj); - log.info("类型为String,值{}",obj); - } else if (obj instanceof Integer) { - ps.setInt(index++, (Integer) obj); - log.info("类型为Integer,值{}",obj); - } else if (obj instanceof Double) { - ps.setDouble(index++, (Double) obj); - log.info("类型为Double,值{}",obj); - } else if (obj instanceof Date) { - ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); - log.info("类型为Date,值{}",obj); - } else if (obj instanceof Boolean) { - ps.setBoolean(index++, (Boolean) obj); - log.info("类型为Boolean,值{}",obj); - } else if (obj instanceof Float) { - ps.setFloat(index++, (Float) obj); - log.info("类型为Float,值{}",obj); - } else if (obj instanceof Long) { - ps.setLong(index++, (Long) obj); - log.info("类型为Long,值{}",obj); - } else { - ps.setObject(index++, obj); - log.info("类型为OOO,值{}",obj); - } - } - ps.addBatch(); - - int[] ints = ps.executeBatch(); - - for (int anInt : ints) { - log.info("插入成功的数据有"+anInt); - } - - conn.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - - return addCount; - } - - - private void closeConnection(Connection conn) { - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - } +// +// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +// +// Source dataSources = sourceService.getById(basicId); +// TableInfo tableInfo = tableInfoService.getById(tableId); +// String tableName = tableInfo.getTableName(); +// +// +// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); +// ExecutorService executorService = Executors.newFixedThreadPool(8); +// +// AtomicInteger addCount = new AtomicInteger(); +// +// Connection connection = null; +// +// try { +// +// connection = hikariDataSource.getConnection(); +// +// for (DataValue[] dataValueList : listList) { +// Connection finalConnection = connection; +// executorService.submit(() -> { +// try { +// addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); +// } catch (SQLException e) { +// +// e.printStackTrace(); +// } +// }); +// } +// +// executorService.shutdown(); +// executorService.awaitTermination(1, TimeUnit.HOURS); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// throw new RuntimeException("Thread interrupted", e); +// } catch (SQLException e) { +// throw new RuntimeException(e); +// } finally { +// closeConnection(connection); +// } +// +// return addCount.get(); +// } +// +// +// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { +// +// +// StringBuilder columns = new StringBuilder("("); +// StringBuilder values = new StringBuilder("VALUES ("); +// +// +// for (DataValue dataValue : dataValueList) { +// String key = dataValue.getKey(); +// columns.append(key).append(", "); +// values.append("?, "); +// } +// +// columns.delete(columns.length() - 2, columns.length()); +// values.delete(values.length() - 2, values.length()); +// +// +// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; +// log.info("添加语句{}",sql); +// +// int addCount = 0; +// +// try { +// conn.setAutoCommit(false); +// try ( +// PreparedStatement ps = conn.prepareStatement(sql)) { +// +// int index = 1; +// for (DataValue value : dataValueList) { +// Object obj = value.getValue(); +// if (obj instanceof String) { +// ps.setString(index++, (String) obj); +// log.info("类型为String,值{}",obj); +// } else if (obj instanceof Integer) { +// ps.setInt(index++, (Integer) obj); +// log.info("类型为Integer,值{}",obj); +// } else if (obj instanceof Double) { +// ps.setDouble(index++, (Double) obj); +// log.info("类型为Double,值{}",obj); +// } else if (obj instanceof Date) { +// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); +// log.info("类型为Date,值{}",obj); +// } else if (obj instanceof Boolean) { +// ps.setBoolean(index++, (Boolean) obj); +// log.info("类型为Boolean,值{}",obj); +// } else if (obj instanceof Float) { +// ps.setFloat(index++, (Float) obj); +// log.info("类型为Float,值{}",obj); +// } else if (obj instanceof Long) { +// ps.setLong(index++, (Long) obj); +// log.info("类型为Long,值{}",obj); +// } else { +// ps.setObject(index++, obj); +// log.info("类型为OOO,值{}",obj); +// } +// } +// ps.addBatch(); +// +// int[] ints = ps.executeBatch(); +// +// for (int anInt : ints) { +// log.info("插入成功的数据有"+anInt); +// } +// +// conn.commit(); +// } catch (SQLException e) { +// e.printStackTrace(); +// } +// } catch (SQLException e) { +// throw new RuntimeException(e); +// } +// +// return addCount; +// } +// +// +// private void closeConnection(Connection conn) { +// if (conn != null) { +// try { +// conn.close(); +// } catch (SQLException e) { +// e.printStackTrace(); +// } +// } +// } // 第二种 @@ -472,7 +478,153 @@ public class ProductServiceImpl implements ProductService { // private static final Logger log = Logger.getLogger(DataInserter.class.getName()); +//第四种 +@Override +public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { +// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); +// Long basicId1 = tableInfoDataSources.getBasicId(); + Source dataSources = sourceService.getById(basicId); + TableInfo tableInfo = tableInfoService.getById(tableId); + String tableName = tableInfo.getTableName(); + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setPoolName("HikariCP 连接池"); + hikariConfig.setDriverClassName(dataSources.getDriverName()); + // hikariConfig.setJdbcUrl(dataSources.getUrl(dataSources) + "&maxAllowedPacket=1G"); + hikariConfig.setJdbcUrl(dataSources.getUrl()); + hikariConfig.setUsername(dataSources.getUsername()); + hikariConfig.setPassword(dataSources.getPassword()); + hikariConfig.setMinimumIdle(2); + hikariConfig.setMaximumPoolSize(10); + + HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); + + ExecutorService executorService = Executors.newFixedThreadPool(8); + AtomicInteger addCount = new AtomicInteger(); + + // 分割数据为较小的批次 + List batches = splitData(listList, 3000); + + try (Connection conn = hikariDataSource.getConnection()) { + conn.setAutoCommit(false); // 开启事务 + + for (DataValue[][] batch : batches) { + executorService.submit(() -> { + try (Statement stmt = conn.createStatement()) { + String sql = buildBatchInsertSQL(tableName, batch); + stmt.executeUpdate(sql); + addCount.addAndGet(batch.length); + } catch (SQLException e) { + log.error("SQLException异常发生", e); + try { + conn.rollback(); // 回滚事务 + } catch (SQLException ex) { + log.error("回滚事务失败", ex); + throw new RuntimeException(ex); + } + throw new RuntimeException(e); + } catch (Exception e) { + log.error("其他异常发生", e); + try { + conn.rollback(); // 回滚事务 + } catch (SQLException ex) { + log.error("回滚事务失败", ex); + throw new RuntimeException(ex); + } + throw new RuntimeException(e); + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.HOURS); + conn.commit(); // 提交事务 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + close(hikariDataSource); // 关闭数据源 + } + + return addCount.get(); +} + + private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { + StringBuilder columns = new StringBuilder("("); + StringBuilder values = new StringBuilder("VALUES "); + + // 构建字段名 + for (DataValue dataValue : batch[0]) { + String key = dataValue.getKey(); + columns.append(key).append(", "); + } + // 删除最后一个逗号和空格 + columns.delete(columns.length() - 2, columns.length()); + + // 构建值部分 + for (DataValue[] dataValueList : batch) { + values.append("("); + for (DataValue dataValue : dataValueList) { + Object value = dataValue.getValue(); + values.append(formatValue(dataValue.getType(), value)).append(", "); + } + values.delete(values.length() - 2, values.length()); + values.append("), "); + } + // 删除最后一个逗号 + values.delete(values.length() - 2, values.length()); + + // 完成 SQL 插入语句 + String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString(); + return sql; + } + + private String formatValue(DataType type, Object value) { + if (value == null) { + // 根据业务需求处理 null 值 + return "NULL"; // 或者其他默认值 + } + + if (type == DataType.VARCHAR || type == DataType.TEXT) { + return "'" + value.toString().replace("'", "''") + "'"; + } else if (type == DataType.BIGINT) { + return value.toString(); + } else if (type == DataType.INT) { + return value.toString(); + } else if (type == DataType.DECIMAL) { + return value.toString(); + } else if (type == DataType.DATETIME) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + return "'" + sdf.format((Date) value) + "'"; + } else if (type == DataType.DOUBLE) { + return value.toString(); + } else { + return "'" + value.toString() + "'"; + } + } + + // 分割数据为较小的批次 + private List splitData(DataValue[][] listList, int batchSize) { + List batches = new ArrayList<>(); + int totalRows = listList.length; + int totalBatches = (int) Math.ceil((double) totalRows / batchSize); + + for (int i = 0; i < totalBatches; i++) { + int start = i * batchSize; + int end = Math.min(start + batchSize, totalRows); + DataValue[][] batch = Arrays.copyOfRange(listList, start, end); + batches.add(batch); + } + + return batches; + } + + // 关闭数据源 + private void close(HikariDataSource dataSource) { + dataSource.close(); + }