提交更新返回值

master
lwj 2024-09-08 11:47:56 +08:00
parent 5b61224161
commit fef283f8f2
1 changed files with 268 additions and 180 deletions

View File

@ -7,15 +7,18 @@ import com.muyu.cloud.etl.service.TableInfoService;
import com.muyu.domain.DataValue; import com.muyu.domain.DataValue;
import com.muyu.domain.Source; import com.muyu.domain.Source;
import com.muyu.domain.TableInfo; import com.muyu.domain.TableInfo;
import com.muyu.domain.enums.DataType;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -29,55 +32,55 @@ public class ProductServiceImpl implements ProductService {
@Autowired @Autowired
private TableInfoService tableInfoService; private TableInfoService tableInfoService;
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { //
// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); // public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
// Long basicId1 = tableInfoDataSources.getBasicId(); //// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
Source dataSources = sourceService.getById(basicId); //// Long basicId1 = tableInfoDataSources.getBasicId();
TableInfo tableInfo = tableInfoService.getById(tableId); // Source dataSources = sourceService.getById(basicId);
String tableName = tableInfo.getTableName(); // TableInfo tableInfo = tableInfoService.getById(tableId);
// String tableName = tableInfo.getTableName();
// HikariDataSource hikariDataSource = getHikariDataSource(dataSources); //
//// HikariDataSource hikariDataSource = getHikariDataSource(dataSources);
HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); //
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
ExecutorService executorService = Executors.newFixedThreadPool(4); //
// ExecutorService executorService = Executors.newFixedThreadPool(4);
AtomicInteger addCount = new AtomicInteger(); //
// AtomicInteger addCount = new AtomicInteger();
Connection connection = null; //
// Connection connection = null;
try { //
connection = hikariDataSource.getConnection(); // try {
// 遍历外部列表 // connection = hikariDataSource.getConnection();
for (DataValue[] dataValueList : listList) { // // 遍历外部列表
Connection finalConnection = connection; // for (DataValue[] dataValueList : listList) {
executorService.submit(() -> { // Connection finalConnection = connection;
try { // executorService.submit(() -> {
addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); // try {
} catch (SQLException e) { // addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList));
// 记录异常 // } catch (SQLException e) {
e.printStackTrace(); // // 记录异常
} // e.printStackTrace();
}); // }
} // });
// }
executorService.shutdown(); //
executorService.awaitTermination(1, TimeUnit.HOURS); // executorService.shutdown();
} catch (InterruptedException e) { // executorService.awaitTermination(1, TimeUnit.HOURS);
Thread.currentThread().interrupt(); // } catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted", e); // Thread.currentThread().interrupt();
} catch (SQLException e) { // throw new RuntimeException("Thread interrupted", e);
throw new RuntimeException(e); // } catch (SQLException e) {
} finally { // throw new RuntimeException(e);
closeConnection(connection); // } finally {
} // closeConnection(connection);
// }
return addCount.get(); //
} // return addCount.get();
// }
//
// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
// private int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { //
// // 获取当前行的所有字段名 // // 获取当前行的所有字段名
// StringBuilder columns = new StringBuilder("("); // StringBuilder columns = new StringBuilder("(");
// StringBuilder values = new StringBuilder("VALUES ("); // StringBuilder values = new StringBuilder("VALUES (");
@ -93,147 +96,232 @@ public class ProductServiceImpl implements ProductService {
// values.delete(values.length() - 2, values.length()); // values.delete(values.length() - 2, values.length());
// //
// // 完成 SQL 插入语句 // // 完成 SQL 插入语句
// String sql = "INSERT INTO " + tableName + " (" + columns.toString() + ") " + values.toString(); // String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
// log.info("添加语句{}",sql);
// //
// int addCount = 0; // int addCount = 0;
// try (PreparedStatement ps = conn.prepareStatement(sql)) { //
// // 设置参数 // try {
// int index = 1; // conn.setAutoCommit(false);
// for (DataValue dataValue : dataValueList) { //
// Object value = dataValue.getValue(); // try (
// if (value instanceof String) { //
// ps.setString(index, (String) value); // PreparedStatement ps = conn.prepareStatement(sql)) {
// } else if (value instanceof Integer) { //
// ps.setInt(index, (Integer) value); //
// } else if (value instanceof Double) { // // 循环设置参数并执行插入
// ps.setDouble(index, (Double) value); // for (DataValue dataValue : dataValueList) {
// } else if (value instanceof Date) { // int index = 1;
// ps.setDate(index, new java.sql.Date(((Date) value).getTime())); // for (DataValue value : dataValueList) {
// } else { // Object obj = value.getValue();
// // 其他类型的处理 // if (obj instanceof String) {
// ps.setObject(index, value); // ps.setString(index++, (String) obj);
// log.info("类型为String,值{}",obj);
// } else if (obj instanceof Integer) {
// ps.setInt(index++, (Integer) obj);
// log.info("类型为Integer,值{}",obj);
// } else if (obj instanceof Double) {
// ps.setDouble(index++, (Double) obj);
// log.info("类型为Double,值{}",obj);
// } else if (obj instanceof Date) {
// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
// log.info("类型为Date,值{}",obj);
// } else if (obj instanceof Boolean) {
// ps.setBoolean(index++, (Boolean) obj);
// log.info("类型为Boolean,值{}",obj);
// } else if (obj instanceof Float) {
// ps.setFloat(index++, (Float) obj);
// log.info("类型为Float,值{}",obj);
// } else if (obj instanceof Long) {
// ps.setLong(index++, (Long) obj);
// log.info("类型为Long,值{}",obj);
// } else {
// ps.setObject(index++, obj);
// log.info("类型为Object,值{}",obj);
// }
// }
// ps.addBatch();
// } // }
// index++; //
// // 执行批量插入操作
// int[] ints = ps.executeBatch();
//
// for (int anInt : ints) {
// log.info("插入成功的数据有"+anInt);
// }
//
// conn.commit();
// } catch (SQLException e) {
// e.printStackTrace();
// } // }
// // 执行插入操作 // } catch (SQLException e) {
// addCount = ps.executeUpdate(); // throw new RuntimeException(e);
// } // }
//
//
// return addCount; // return addCount;
// } // }
public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
// 获取当前行的所有字段名
StringBuilder columns = new StringBuilder("(");
StringBuilder values = new StringBuilder("VALUES (");
// 构建字段名和占位符
for (DataValue dataValue : dataValueList) {
String key = dataValue.getKey();
columns.append(key).append(", ");
values.append("?, ");
}
// 删除最后一个逗号和空格
columns.delete(columns.length() - 2, columns.length());
values.delete(values.length() - 2, values.length());
// 完成 SQL 插入语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
log.info("添加语句{}",sql);
int addCount = 0;
try {
conn.setAutoCommit(false);
try (
PreparedStatement ps = conn.prepareStatement(sql)) {
// 循环设置参数并执行插入
for (DataValue dataValue : dataValueList) {
int index = 1;
for (DataValue value : dataValueList) {
Object obj = value.getValue();
if (obj instanceof String) {
ps.setString(index++, (String) obj);
log.info("类型为String,值{}",obj);
} else if (obj instanceof Integer) {
ps.setInt(index++, (Integer) obj);
log.info("类型为Integer,值{}",obj);
} else if (obj instanceof Double) {
ps.setDouble(index++, (Double) obj);
log.info("类型为Double,值{}",obj);
} else if (obj instanceof Date) {
ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
log.info("类型为Date,值{}",obj);
} else if (obj instanceof Boolean) {
ps.setBoolean(index++, (Boolean) obj);
log.info("类型为Boolean,值{}",obj);
} else if (obj instanceof Float) {
ps.setFloat(index++, (Float) obj);
log.info("类型为Float,值{}",obj);
} else if (obj instanceof Long) {
ps.setLong(index++, (Long) obj);
log.info("类型为Long,值{}",obj);
} else {
ps.setObject(index++, obj);
log.info("类型为Object,值{}",obj);
}
}
ps.addBatch();
}
// 执行批量插入操作
int[] ints = ps.executeBatch();
for (int anInt : ints) {
log.info("插入成功的数据有"+anInt);
}
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return addCount;
}
private void closeConnection(Connection conn) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// public static HikariDataSource instance = null;
// @NotNull
// public static synchronized HikariDataSource getHikariDataSource(Source dataSources) {
// if(instance == null) {
// HikariConfig hikariConfig = new HikariConfig();
// hikariConfig.setPoolName("HikariCP 连接池");
// hikariConfig.setDataSourceClassName(dataSources.getDriverName());
// hikariConfig.addDataSourceProperty("user", dataSources.getUsername());
// hikariConfig.addDataSourceProperty("password", dataSources.getPassword());
// hikariConfig.addDataSourceProperty("url", dataSources.getUrl());
// hikariConfig.setMaximumPoolSize(15);
// //
// instance = new HikariDataSource(hikariConfig); //
// private void closeConnection(Connection conn) {
// if (conn != null) {
// try {
// conn.close();
// } catch (SQLException e) {
// e.printStackTrace();
// }
// } // }
// return instance;
// } // }
@Override
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
// Long basicId1 = tableInfoDataSources.getBasicId();
Source dataSources = sourceService.getById(basicId);
TableInfo tableInfo = tableInfoService.getById(tableId);
String tableName = tableInfo.getTableName();
HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
ExecutorService executorService = Executors.newFixedThreadPool(8);
AtomicInteger addCount = new AtomicInteger();
// 分割数据为3000个批次
List<DataValue[][]> batches = splitData(listList, 2000);
try (Connection conn = hikariDataSource.getConnection()) {
conn.setAutoCommit(false); // 开启事务
for (DataValue[][] batch : batches) {
executorService.submit(() -> {
try (Statement stmt = conn.createStatement()) {
log.info("sql开始拼接");
String sql = buildBatchInsertSQL(tableName, batch);
log.info("sql拼接结束: {}", sql);
stmt.executeUpdate(sql);
addCount.addAndGet(batch.length);
} catch (SQLException e) {
log.error("SQLException异常发生", e);
try {
conn.rollback(); // 回滚事务
} catch (SQLException ex) {
log.error("回滚事务失败", ex);
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
conn.commit(); // 提交事务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
close(hikariDataSource); // 关闭数据源
}
return addCount.get();
}
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
log.info("执行到1");
StringBuilder columns = new StringBuilder("(");
StringBuilder values = new StringBuilder("VALUES ");
// 构建字段名
for (DataValue dataValue : batch[0]) {
String key = dataValue.getKey();
columns.append(key).append(", ");
}
log.info("执行到2");
// 删除最后一个逗号和空格
columns.delete(columns.length() - 2, columns.length());
log.info("执行到3");
// 构建值部分
for (DataValue[] dataValueList : batch) {
values.append("(");
log.info("执行到3.0");
for (DataValue dataValue : dataValueList) {
Object value = dataValue.getValue();
log.info("执行到3.00");
values.append(formatValue(dataValue.getType(), value)).append(", ");
log.info("执行到3.9");
}
log.info("执行到4");
values.delete(values.length() - 2, values.length());
values.append("), ");
log.info("执行到5");
}
log.info("执行到6");
log.info("当前执行到6的sql语句:"+values.toString());
values.delete(values.length() - 2, values.length());
log.info("执行到7");
log.info("当前执行到7的sql语句:"+values.toString());
// 完成 SQL 插入语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
log.info("批量添加的sql: {}", sql);
return sql;
}
private String formatValue(DataType type, Object value) {
log.info("执行到3.1");
if (type == DataType.VARCHAR || type == DataType.TEXT) {
log.info("执行到3.2");
return "'" + value.toString().replace("'", "''") + "'";
} else if (type == DataType.BIGINT) {
log.info("执行到3.3");
return value.toString();
} else if (type == DataType.INT) {
log.info("执行到3.4");
return value.toString();
} else if (type == DataType.DECIMAL) {
log.info("执行到3.5");
return value.toString();
} else if (type == DataType.DATETIME) {
log.info("执行到3.6");
return "'" + new java.sql.Date(((Date) value).getTime()) + "'";
} else if (type == DataType.DOUBLE) {
log.info("执行到3.7");
return value.toString();
} else {
// 其他类型的处理
log.info("执行到3.8");
return "'" + value.toString() + "'";
}
}
private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
List<DataValue[][]> batches = new ArrayList<>();
int totalSize = listList.length;
int numBatches = (int) Math.ceil((double) totalSize / batchSize);
for (int i = 0; i < numBatches; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, totalSize);
DataValue[][] batch = new DataValue[end - start][];
System.arraycopy(listList, start, batch, 0, end - start);
batches.add(batch);
}
return batches;
}
private static void close(HikariDataSource dataSource) {
if (dataSource != null) {
dataSource.close();
}
}
} }