提交更新返回值

master
lwj 2024-09-08 12:53:32 +08:00
parent f8b12e1237
commit cda6d246c6
1 changed files with 178 additions and 270 deletions

View File

@ -7,18 +7,15 @@ import com.muyu.cloud.etl.service.TableInfoService;
import com.muyu.domain.DataValue; import com.muyu.domain.DataValue;
import com.muyu.domain.Source; import com.muyu.domain.Source;
import com.muyu.domain.TableInfo; import com.muyu.domain.TableInfo;
import com.muyu.domain.enums.DataType;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -32,55 +29,55 @@ public class ProductServiceImpl implements ProductService {
@Autowired @Autowired
private TableInfoService tableInfoService; private TableInfoService tableInfoService;
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { // TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
//// TableInfo tableInfoDataSources = tableInfoService.getById(basicId); // Long basicId1 = tableInfoDataSources.getBasicId();
//// Long basicId1 = tableInfoDataSources.getBasicId(); Source dataSources = sourceService.getById(basicId);
// Source dataSources = sourceService.getById(basicId); TableInfo tableInfo = tableInfoService.getById(tableId);
// TableInfo tableInfo = tableInfoService.getById(tableId); String tableName = tableInfo.getTableName();
// String tableName = tableInfo.getTableName();
// // HikariDataSource hikariDataSource = getHikariDataSource(dataSources);
//// HikariDataSource hikariDataSource = getHikariDataSource(dataSources);
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
// ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorService executorService = Executors.newFixedThreadPool(4);
// AtomicInteger addCount = new AtomicInteger();
// AtomicInteger addCount = new AtomicInteger();
// Connection connection = null;
// Connection connection = null;
// try {
// try { connection = hikariDataSource.getConnection();
// connection = hikariDataSource.getConnection(); // 遍历外部列表
// // 遍历外部列表 for (DataValue[] dataValueList : listList) {
// for (DataValue[] dataValueList : listList) { Connection finalConnection = connection;
// Connection finalConnection = connection; executorService.submit(() -> {
// executorService.submit(() -> { try {
// try { addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList));
// addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); } catch (SQLException e) {
// } catch (SQLException e) { // 记录异常
// // 记录异常 e.printStackTrace();
// e.printStackTrace(); }
// } });
// }); }
// }
// executorService.shutdown();
// executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.HOURS);
// executorService.awaitTermination(1, TimeUnit.HOURS); } catch (InterruptedException e) {
// } catch (InterruptedException e) { Thread.currentThread().interrupt();
// Thread.currentThread().interrupt(); throw new RuntimeException("Thread interrupted", e);
// throw new RuntimeException("Thread interrupted", e); } catch (SQLException e) {
// } catch (SQLException e) { throw new RuntimeException(e);
// throw new RuntimeException(e); } finally {
// } finally { closeConnection(connection);
// closeConnection(connection); }
// }
// return addCount.get();
// return addCount.get(); }
// }
//
// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
// // private int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
// // 获取当前行的所有字段名 // // 获取当前行的所有字段名
// StringBuilder columns = new StringBuilder("("); // StringBuilder columns = new StringBuilder("(");
// StringBuilder values = new StringBuilder("VALUES ("); // StringBuilder values = new StringBuilder("VALUES (");
@ -96,234 +93,145 @@ public class ProductServiceImpl implements ProductService {
// values.delete(values.length() - 2, values.length()); // values.delete(values.length() - 2, values.length());
// //
// // 完成 SQL 插入语句 // // 完成 SQL 插入语句
// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; // String sql = "INSERT INTO " + tableName + " (" + columns.toString() + ") " + values.toString();
// log.info("添加语句{}",sql);
// //
// int addCount = 0; // int addCount = 0;
// // try (PreparedStatement ps = conn.prepareStatement(sql)) {
// try { // // 设置参数
// conn.setAutoCommit(false); // int index = 1;
// // for (DataValue dataValue : dataValueList) {
// try ( // Object value = dataValue.getValue();
// // if (value instanceof String) {
// PreparedStatement ps = conn.prepareStatement(sql)) { // ps.setString(index, (String) value);
// // } else if (value instanceof Integer) {
// // ps.setInt(index, (Integer) value);
// // 循环设置参数并执行插入 // } else if (value instanceof Double) {
// for (DataValue dataValue : dataValueList) { // ps.setDouble(index, (Double) value);
// int index = 1; // } else if (value instanceof Date) {
// for (DataValue value : dataValueList) { // ps.setDate(index, new java.sql.Date(((Date) value).getTime()));
// Object obj = value.getValue(); // } else {
// if (obj instanceof String) { // // 其他类型的处理
// ps.setString(index++, (String) obj); // ps.setObject(index, value);
// log.info("类型为String,值{}",obj);
// } else if (obj instanceof Integer) {
// ps.setInt(index++, (Integer) obj);
// log.info("类型为Integer,值{}",obj);
// } else if (obj instanceof Double) {
// ps.setDouble(index++, (Double) obj);
// log.info("类型为Double,值{}",obj);
// } else if (obj instanceof Date) {
// ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
// log.info("类型为Date,值{}",obj);
// } else if (obj instanceof Boolean) {
// ps.setBoolean(index++, (Boolean) obj);
// log.info("类型为Boolean,值{}",obj);
// } else if (obj instanceof Float) {
// ps.setFloat(index++, (Float) obj);
// log.info("类型为Float,值{}",obj);
// } else if (obj instanceof Long) {
// ps.setLong(index++, (Long) obj);
// log.info("类型为Long,值{}",obj);
// } else {
// ps.setObject(index++, obj);
// log.info("类型为Object,值{}",obj);
// }
// }
// ps.addBatch();
// } // }
// // index++;
// // 执行批量插入操作
// int[] ints = ps.executeBatch();
//
// for (int anInt : ints) {
// log.info("插入成功的数据有"+anInt);
// }
//
// conn.commit();
// } catch (SQLException e) {
// e.printStackTrace();
// } // }
// } catch (SQLException e) { // // 执行插入操作
// throw new RuntimeException(e); // addCount = ps.executeUpdate();
// } // }
//
//
// return addCount; // return addCount;
// } // }
public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
// 获取当前行的所有字段名
StringBuilder columns = new StringBuilder("(");
StringBuilder values = new StringBuilder("VALUES (");
// 构建字段名和占位符
for (DataValue dataValue : dataValueList) {
String key = dataValue.getKey();
columns.append(key).append(", ");
values.append("?, ");
}
// 删除最后一个逗号和空格
columns.delete(columns.length() - 2, columns.length());
values.delete(values.length() - 2, values.length());
// 完成 SQL 插入语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
log.info("添加语句{}",sql);
int addCount = 0;
try {
conn.setAutoCommit(false);
try (
PreparedStatement ps = conn.prepareStatement(sql)) {
// 循环设置参数并执行插入
for (DataValue dataValue : dataValueList) {
int index = 1;
Object obj = dataValue.getValue();
if (obj instanceof String) {
ps.setString(index++, (String) obj);
log.info("类型为String,值{}",obj);
} else if (obj instanceof Integer) {
ps.setInt(index++, (Integer) obj);
log.info("类型为Integer,值{}",obj);
} else if (obj instanceof Double) {
ps.setDouble(index++, (Double) obj);
log.info("类型为Double,值{}",obj);
} else if (obj instanceof Date) {
ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
log.info("类型为Date,值{}",obj);
} else if (obj instanceof Boolean) {
ps.setBoolean(index++, (Boolean) obj);
log.info("类型为Boolean,值{}",obj);
} else if (obj instanceof Float) {
ps.setFloat(index++, (Float) obj);
log.info("类型为Float,值{}",obj);
} else if (obj instanceof Long) {
ps.setLong(index++, (Long) obj);
log.info("类型为Long,值{}",obj);
} else {
ps.setObject(index++, obj);
log.info("类型为obj,值{}",obj);
}
ps.addBatch();
}
// 执行批量插入操作
int[] ints = ps.executeBatch();
for (int anInt : ints) {
log.info("插入成功的数据有"+anInt);
}
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return addCount;
}
private void closeConnection(Connection conn) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// public static HikariDataSource instance = null;
// @NotNull
// public static synchronized HikariDataSource getHikariDataSource(Source dataSources) {
// if(instance == null) {
// HikariConfig hikariConfig = new HikariConfig();
// hikariConfig.setPoolName("HikariCP 连接池");
// hikariConfig.setDataSourceClassName(dataSources.getDriverName());
// hikariConfig.addDataSourceProperty("user", dataSources.getUsername());
// hikariConfig.addDataSourceProperty("password", dataSources.getPassword());
// hikariConfig.addDataSourceProperty("url", dataSources.getUrl());
// hikariConfig.setMaximumPoolSize(15);
// //
// // instance = new HikariDataSource(hikariConfig);
// private void closeConnection(Connection conn) {
// if (conn != null) {
// try {
// conn.close();
// } catch (SQLException e) {
// e.printStackTrace();
// }
// } // }
// return instance;
// } // }
@Override
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
// Long basicId1 = tableInfoDataSources.getBasicId();
Source dataSources = sourceService.getById(basicId);
TableInfo tableInfo = tableInfoService.getById(tableId);
String tableName = tableInfo.getTableName();
HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
ExecutorService executorService = Executors.newFixedThreadPool(8);
AtomicInteger addCount = new AtomicInteger();
// 分割数据为3000个批次
List<DataValue[][]> batches = splitData(listList, 2000);
try (Connection conn = hikariDataSource.getConnection()) {
conn.setAutoCommit(false); // 开启事务
for (DataValue[][] batch : batches) {
executorService.submit(() -> {
try (Statement stmt = conn.createStatement()) {
log.info("sql开始拼接");
String sql = buildBatchInsertSQL(tableName, batch);
log.info("sql拼接结束: {}", sql);
stmt.executeUpdate(sql);
addCount.addAndGet(batch.length);
} catch (SQLException e) {
log.error("SQLException异常发生", e);
try {
conn.rollback(); // 回滚事务
} catch (SQLException ex) {
log.error("回滚事务失败", ex);
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
conn.commit(); // 提交事务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
close(hikariDataSource); // 关闭数据源
}
return addCount.get();
}
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
log.info("执行到1");
StringBuilder columns = new StringBuilder("(");
StringBuilder values = new StringBuilder("VALUES ");
// 构建字段名
for (DataValue dataValue : batch[0]) {
String key = dataValue.getKey();
columns.append(key).append(", ");
}
log.info("执行到2");
// 删除最后一个逗号和空格
columns.delete(columns.length() - 2, columns.length());
log.info("执行到3");
// 构建值部分
for (DataValue[] dataValueList : batch) {
values.append("(");
log.info("执行到3.0");
for (DataValue dataValue : dataValueList) {
Object value = dataValue.getValue();
log.info("执行到3.00");
String s = formatValue(dataValue.getType(), value);
log.info("拼接sql+=++++[{}",s);
values.append(s).append(", ");
log.info("执行到3.9");
}
log.info("执行到4");
values.delete(values.length() - 2, values.length());
values.append("), ");
log.info("执行到5");
}
log.info("执行到6");
log.info("当前执行到6的sql语句:"+values.toString());
values.delete(values.length() - 2, values.length());
log.info("执行到7");
log.info("当前执行到7的sql语句:"+values.toString());
// 完成 SQL 插入语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
log.info("批量添加的sql: {}", sql);
return sql;
}
private String formatValue(DataType type, Object value) {
log.info("执行到3.1");
if (type == DataType.VARCHAR || type == DataType.TEXT) {
log.info("执行到3.2");
return "'" + value.toString().replace("'", "''") + "'";
} else if (type == DataType.BIGINT) {
log.info("执行到3.3");
return value.toString();
} else if (type == DataType.INT) {
log.info("执行到3.4");
return value.toString();
} else if (type == DataType.DECIMAL) {
log.info("执行到3.5");
return value.toString();
} else if (type == DataType.DATETIME) {
log.info("执行到3.6");
return "'" + new java.sql.Date(((Date) value).getTime()) + "'";
} else if (type == DataType.DOUBLE) {
log.info("执行到3.7");
return value.toString();
} else {
// 其他类型的处理
log.info("执行到3.8");
return "'" + value.toString() + "'";
}
}
private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
List<DataValue[][]> batches = new ArrayList<>();
int totalSize = listList.length;
int numBatches = (int) Math.ceil((double) totalSize / batchSize);
for (int i = 0; i < numBatches; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, totalSize);
DataValue[][] batch = new DataValue[end - start][];
System.arraycopy(listList, start, batch, 0, end - start);
batches.add(batch);
}
return batches;
}
private static void close(HikariDataSource dataSource) {
if (dataSource != null) {
dataSource.close();
}
}
} }