更新添加

master
lwj 2024-09-08 14:59:44 +08:00
parent a2fa5ee7df
commit 46979023d2
1 changed files with 232 additions and 160 deletions

View File

@ -7,15 +7,18 @@ import com.muyu.cloud.etl.service.TableInfoService;
import com.muyu.domain.DataValue; import com.muyu.domain.DataValue;
import com.muyu.domain.Source; import com.muyu.domain.Source;
import com.muyu.domain.TableInfo; import com.muyu.domain.TableInfo;
import com.muyu.domain.enums.DataType;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -29,55 +32,56 @@ public class ProductServiceImpl implements ProductService {
@Autowired @Autowired
private TableInfoService tableInfoService; private TableInfoService tableInfoService;
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
// Long basicId1 = tableInfoDataSources.getBasicId();
Source dataSources = sourceService.getById(basicId);
TableInfo tableInfo = tableInfoService.getById(tableId);
String tableName = tableInfo.getTableName();
// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); // public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
//// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); //// Long basicId1 = tableInfoDataSources.getBasicId();
// Source dataSources = sourceService.getById(basicId);
ExecutorService executorService = Executors.newFixedThreadPool(4); // TableInfo tableInfo = tableInfoService.getById(tableId);
// String tableName = tableInfo.getTableName();
AtomicInteger addCount = new AtomicInteger(); //
//// HikariDataSource hikariDataSource = getHikariDataSource(dataSources);
Connection connection = null; //
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
try { //
connection = hikariDataSource.getConnection(); // ExecutorService executorService = Executors.newFixedThreadPool(4);
// 遍历外部列表 //
for (DataValue[] dataValueList : listList) { // AtomicInteger addCount = new AtomicInteger();
Connection finalConnection = connection; //
executorService.submit(() -> { // Connection connection = null;
try { //
addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); // try {
} catch (SQLException e) { // connection = hikariDataSource.getConnection();
// 记录异常 // // 遍历外部列表
e.printStackTrace(); // for (DataValue[] dataValueList : listList) {
} // Connection finalConnection = connection;
}); // executorService.submit(() -> {
} // try {
// addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList));
executorService.shutdown(); // } catch (SQLException e) {
executorService.awaitTermination(1, TimeUnit.HOURS); // // 记录异常
} catch (InterruptedException e) { // e.printStackTrace();
Thread.currentThread().interrupt(); // }
throw new RuntimeException("Thread interrupted", e); // });
} catch (SQLException e) { // }
throw new RuntimeException(e); //
} finally { // executorService.shutdown();
closeConnection(connection); // executorService.awaitTermination(1, TimeUnit.HOURS);
} // } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
return addCount.get(); // throw new RuntimeException("Thread interrupted", e);
} // } catch (SQLException e) {
// throw new RuntimeException(e);
// } finally {
// closeConnection(connection);
// private int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { // }
//
// return addCount.get();
// }
//
//
// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
//
// // 获取当前行的所有字段名 // // 获取当前行的所有字段名
// StringBuilder columns = new StringBuilder("("); // StringBuilder columns = new StringBuilder("(");
// StringBuilder values = new StringBuilder("VALUES ("); // StringBuilder values = new StringBuilder("VALUES (");
@ -93,146 +97,214 @@ public class ProductServiceImpl implements ProductService {
// values.delete(values.length() - 2, values.length()); // values.delete(values.length() - 2, values.length());
// //
// // 完成 SQL 插入语句 // // 完成 SQL 插入语句
// String sql = "INSERT INTO " + tableName + " (" + columns.toString() + ") " + values.toString(); // String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
// log.info("添加语句{}",sql);
// //
// int addCount = 0; // int addCount = 0;
// try (PreparedStatement ps = conn.prepareStatement(sql)) { //
// // 设置参数 // try {
// conn.setAutoCommit(false);
// try (
// PreparedStatement ps = conn.prepareStatement(sql)) {
// // 循环设置参数并执行插入
//// for (DataValue dataValue : dataValueList) {
// int index = 1; // int index = 1;
// for (DataValue dataValue : dataValueList) { // for (DataValue value : dataValueList) {
// Object value = dataValue.getValue(); // Object obj = value.getValue();
// if (value instanceof String) { // if (obj instanceof String) {
// ps.setString(index, (String) value); // ps.setString(index++, (String) obj);
// } else if (value instanceof Integer) { // log.info("类型为String,值{}",obj);
// ps.setInt(index, (Integer) value); // } else if (obj instanceof Integer) {
// } else if (value instanceof Double) { // ps.setInt(index++, (Integer) obj);
// ps.setDouble(index, (Double) value); // log.info("类型为Integer,值{}",obj);
// } else if (value instanceof Date) { // } else if (obj instanceof Double) {
// ps.setDate(index, new java.sql.Date(((Date) value).getTime())); // ps.setDouble(index++, (Double) obj);
// log.info("类型为Double,值{}",obj);
// } else if (obj instanceof Date) {
// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
// log.info("类型为Date,值{}",obj);
// } else if (obj instanceof Boolean) {
// ps.setBoolean(index++, (Boolean) obj);
// log.info("类型为Boolean,值{}",obj);
// } else if (obj instanceof Float) {
// ps.setFloat(index++, (Float) obj);
// log.info("类型为Float,值{}",obj);
// } else if (obj instanceof Long) {
// ps.setLong(index++, (Long) obj);
// log.info("类型为Long,值{}",obj);
// } else { // } else {
// // 其他类型的处理 // ps.setObject(index++, obj);
// ps.setObject(index, value); // log.info("类型为OOO,值{}",obj);
// } // }
// index++;
// } // }
// // 执行插入操作 // ps.addBatch();
// addCount = ps.executeUpdate(); //// }
//
// // 执行批量插入操作
// int[] ints = ps.executeBatch();
//
// for (int anInt : ints) {
// log.info("插入成功的数据有"+anInt);
// } // }
//
// conn.commit();
// } catch (SQLException e) {
// e.printStackTrace();
// }
// } catch (SQLException e) {
// throw new RuntimeException(e);
// }
//
// return addCount; // return addCount;
// }
//
//
// private void closeConnection(Connection conn) {
// if (conn != null) {
// try {
// conn.close();
// } catch (SQLException e) {
// e.printStackTrace();
// }
// }
// } // }
public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { @Override
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
// Long basicId1 = tableInfoDataSources.getBasicId();
Source dataSources = sourceService.getById(basicId);
TableInfo tableInfo = tableInfoService.getById(tableId);
String tableName = tableInfo.getTableName();
// 获取当前行的所有字段名 HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
// HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
ExecutorService executorService = Executors.newFixedThreadPool(8);
AtomicInteger addCount = new AtomicInteger();
// 分割数据为3000个批次
List<DataValue[][]> batches = splitData(listList, 5000);
try (Connection conn = hikariDataSource.getConnection()) {
conn.setAutoCommit(false); // 开启事务
for (DataValue[][] batch : batches) {
executorService.submit(() -> {
try (Statement stmt = conn.createStatement()) {
String sql = buildBatchInsertSQL(tableName, batch);
stmt.executeUpdate(sql);
addCount.addAndGet(batch.length);
} catch (SQLException e) {
log.error("SQLException异常发生", e);
try {
conn.rollback(); // 回滚事务
} catch (SQLException ex) {
log.error("回滚事务失败", ex);
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
conn.commit(); // 提交事务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
close(hikariDataSource); // 关闭数据源
}
return addCount.get();
}
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
StringBuilder columns = new StringBuilder("("); StringBuilder columns = new StringBuilder("(");
StringBuilder values = new StringBuilder("VALUES ("); StringBuilder values = new StringBuilder("VALUES ");
// 构建字段名
// 构建字段名和占位符 for (DataValue dataValue : batch[0]) {
for (DataValue dataValue : dataValueList) {
String key = dataValue.getKey(); String key = dataValue.getKey();
columns.append(key).append(", "); columns.append(key).append(", ");
values.append("?, ");
} }
// 删除最后一个逗号和空格 // 删除最后一个逗号和空格
columns.delete(columns.length() - 2, columns.length()); columns.delete(columns.length() - 2, columns.length());
// 构建值部分
for (DataValue[] dataValueList : batch) {
values.append("(");
for (DataValue dataValue : dataValueList) {
Object value = dataValue.getValue();
values.append(formatValue(dataValue.getType(), value)).append(", ");
}
values.delete(values.length() - 2, values.length());
values.append("), ");
}
values.delete(values.length() - 2, values.length()); values.delete(values.length() - 2, values.length());
// 完成 SQL 插入语句 // 完成 SQL 插入语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
log.info("添加语句{}",sql); return sql;
}
int addCount = 0; private String formatValue(DataType type, Object value) {
try { if (type == DataType.VARCHAR || type == DataType.TEXT) {
conn.setAutoCommit(false);
try ( return "'" + value.toString().replace("'", "''") + "'";
} else if (type == DataType.BIGINT) {
PreparedStatement ps = conn.prepareStatement(sql)) { return value.toString();
} else if (type == DataType.INT) {
return value.toString();
} else if (type == DataType.DECIMAL) {
// 循环设置参数并执行插入 return value.toString();
// for (DataValue dataValue : dataValueList) { } else if (type == DataType.DATETIME) {
int index = 1;
for (DataValue value : dataValueList) { return "'" + new java.sql.Date(((Date) value).getTime()) + "'";
Object obj = value.getValue(); } else if (type == DataType.DOUBLE) {
if (obj instanceof String) {
ps.setString(index++, (String) obj); return value.toString();
log.info("类型为String,值{}",obj);
} else if (obj instanceof Integer) {
ps.setInt(index++, (Integer) obj);
log.info("类型为Integer,值{}",obj);
} else if (obj instanceof Double) {
ps.setDouble(index++, (Double) obj);
log.info("类型为Double,值{}",obj);
} else if (obj instanceof Date) {
ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
log.info("类型为Date,值{}",obj);
} else if (obj instanceof Boolean) {
ps.setBoolean(index++, (Boolean) obj);
log.info("类型为Boolean,值{}",obj);
} else if (obj instanceof Float) {
ps.setFloat(index++, (Float) obj);
log.info("类型为Float,值{}",obj);
} else if (obj instanceof Long) {
ps.setLong(index++, (Long) obj);
log.info("类型为Long,值{}",obj);
} else { } else {
ps.setObject(index++, obj); // 其他类型的处理
log.info("类型为OOO,值{}",obj);
}
}
ps.addBatch();
// }
// 执行批量插入操作 return "'" + value.toString() + "'";
int[] ints = ps.executeBatch();
for (int anInt : ints) {
log.info("插入成功的数据有"+anInt);
} }
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
} catch (SQLException e) {
throw new RuntimeException(e);
} }
private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
List<DataValue[][]> batches = new ArrayList<>();
int totalSize = listList.length;
int numBatches = (int) Math.ceil((double) totalSize / batchSize);
return addCount; for (int i = 0; i < numBatches; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, totalSize);
DataValue[][] batch = new DataValue[end - start][];
System.arraycopy(listList, start, batch, 0, end - start);
batches.add(batch);
} }
return batches;
private void closeConnection(Connection conn) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
} }
private static void close(HikariDataSource dataSource) {
if (dataSource != null) {
dataSource.close();
} }
} }
// public static HikariDataSource instance = null;
// @NotNull
// public static synchronized HikariDataSource getHikariDataSource(Source dataSources) {
// if(instance == null) {
// HikariConfig hikariConfig = new HikariConfig();
// hikariConfig.setPoolName("HikariCP 连接池");
// hikariConfig.setDataSourceClassName(dataSources.getDriverName());
// hikariConfig.addDataSourceProperty("user", dataSources.getUsername());
// hikariConfig.addDataSourceProperty("password", dataSources.getPassword());
// hikariConfig.addDataSourceProperty("url", dataSources.getUrl());
// hikariConfig.setMaximumPoolSize(15);
//
// instance = new HikariDataSource(hikariConfig);
// }
// return instance;
// }