修改查询语句

master
lwj 2024-09-09 15:33:46 +08:00
parent a7bc6b1fad
commit 6d0413a76c
1 changed files with 284 additions and 132 deletions

View File

@ -1,21 +1,26 @@
package com.muyu.cloud.etl.service.impl; package com.muyu.cloud.etl.service.impl;
import com.muyu.Hikari.HikariPool;
import com.muyu.cloud.etl.service.ProductService; import com.muyu.cloud.etl.service.ProductService;
import com.muyu.cloud.etl.service.SourceService; import com.muyu.cloud.etl.service.SourceService;
import com.muyu.cloud.etl.service.TableInfoService; import com.muyu.cloud.etl.service.TableInfoService;
import com.muyu.domain.DataValue; import com.muyu.domain.DataValue;
import com.muyu.domain.Source; import com.muyu.domain.Source;
import com.muyu.domain.TableInfo; import com.muyu.domain.TableInfo;
import com.muyu.domain.enums.DataType;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -30,136 +35,137 @@ public class ProductServiceImpl implements ProductService {
private TableInfoService tableInfoService; private TableInfoService tableInfoService;
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { //
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
Source dataSources = sourceService.getById(basicId); //
TableInfo tableInfo = tableInfoService.getById(tableId); // Source dataSources = sourceService.getById(basicId);
String tableName = tableInfo.getTableName(); // TableInfo tableInfo = tableInfoService.getById(tableId);
// String tableName = tableInfo.getTableName();
//
//
HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); // HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
ExecutorService executorService = Executors.newFixedThreadPool(8); // ExecutorService executorService = Executors.newFixedThreadPool(8);
//
AtomicInteger addCount = new AtomicInteger(); // AtomicInteger addCount = new AtomicInteger();
//
Connection connection = null; // Connection connection = null;
//
try { // try {
connection = hikariDataSource.getConnection(); //
// connection = hikariDataSource.getConnection();
for (DataValue[] dataValueList : listList) { //
Connection finalConnection = connection; // for (DataValue[] dataValueList : listList) {
executorService.submit(() -> { // Connection finalConnection = connection;
try { // executorService.submit(() -> {
addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList)); // try {
} catch (SQLException e) { // addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList));
// } catch (SQLException e) {
e.printStackTrace(); //
} // e.printStackTrace();
}); // }
} // });
// }
executorService.shutdown(); //
executorService.awaitTermination(1, TimeUnit.HOURS); // executorService.shutdown();
} catch (InterruptedException e) { // executorService.awaitTermination(1, TimeUnit.HOURS);
Thread.currentThread().interrupt(); // } catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted", e); // Thread.currentThread().interrupt();
} catch (SQLException e) { // throw new RuntimeException("Thread interrupted", e);
throw new RuntimeException(e); // } catch (SQLException e) {
} finally { // throw new RuntimeException(e);
closeConnection(connection); // } finally {
} // closeConnection(connection);
// }
return addCount.get(); //
} // return addCount.get();
// }
//
public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException { //
// public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
//
StringBuilder columns = new StringBuilder("("); //
StringBuilder values = new StringBuilder("VALUES ("); // StringBuilder columns = new StringBuilder("(");
// StringBuilder values = new StringBuilder("VALUES (");
//
for (DataValue dataValue : dataValueList) { //
String key = dataValue.getKey(); // for (DataValue dataValue : dataValueList) {
columns.append(key).append(", "); // String key = dataValue.getKey();
values.append("?, "); // columns.append(key).append(", ");
} // values.append("?, ");
// }
columns.delete(columns.length() - 2, columns.length()); //
values.delete(values.length() - 2, values.length()); // columns.delete(columns.length() - 2, columns.length());
// values.delete(values.length() - 2, values.length());
//
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")"; //
log.info("添加语句{}",sql); // String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
// log.info("添加语句{}",sql);
int addCount = 0; //
// int addCount = 0;
try { //
conn.setAutoCommit(false); // try {
try ( // conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(sql)) { // try (
// PreparedStatement ps = conn.prepareStatement(sql)) {
int index = 1; //
for (DataValue value : dataValueList) { // int index = 1;
Object obj = value.getValue(); // for (DataValue value : dataValueList) {
if (obj instanceof String) { // Object obj = value.getValue();
ps.setString(index++, (String) obj); // if (obj instanceof String) {
log.info("类型为String,值{}",obj); // ps.setString(index++, (String) obj);
} else if (obj instanceof Integer) { // log.info("类型为String,值{}",obj);
ps.setInt(index++, (Integer) obj); // } else if (obj instanceof Integer) {
log.info("类型为Integer,值{}",obj); // ps.setInt(index++, (Integer) obj);
} else if (obj instanceof Double) { // log.info("类型为Integer,值{}",obj);
ps.setDouble(index++, (Double) obj); // } else if (obj instanceof Double) {
log.info("类型为Double,值{}",obj); // ps.setDouble(index++, (Double) obj);
} else if (obj instanceof Date) { // log.info("类型为Double,值{}",obj);
ps.setDate(index++, new java.sql.Date(((Date) obj).getTime())); // } else if (obj instanceof Date) {
log.info("类型为Date,值{}",obj); // ps.setDate(index++, new java.sql.Date(((Date) obj).getTime()));
} else if (obj instanceof Boolean) { // log.info("类型为Date,值{}",obj);
ps.setBoolean(index++, (Boolean) obj); // } else if (obj instanceof Boolean) {
log.info("类型为Boolean,值{}",obj); // ps.setBoolean(index++, (Boolean) obj);
} else if (obj instanceof Float) { // log.info("类型为Boolean,值{}",obj);
ps.setFloat(index++, (Float) obj); // } else if (obj instanceof Float) {
log.info("类型为Float,值{}",obj); // ps.setFloat(index++, (Float) obj);
} else if (obj instanceof Long) { // log.info("类型为Float,值{}",obj);
ps.setLong(index++, (Long) obj); // } else if (obj instanceof Long) {
log.info("类型为Long,值{}",obj); // ps.setLong(index++, (Long) obj);
} else { // log.info("类型为Long,值{}",obj);
ps.setObject(index++, obj); // } else {
log.info("类型为OOO,值{}",obj); // ps.setObject(index++, obj);
} // log.info("类型为OOO,值{}",obj);
} // }
ps.addBatch(); // }
// ps.addBatch();
int[] ints = ps.executeBatch(); //
// int[] ints = ps.executeBatch();
for (int anInt : ints) { //
log.info("插入成功的数据有"+anInt); // for (int anInt : ints) {
} // log.info("插入成功的数据有"+anInt);
// }
conn.commit(); //
} catch (SQLException e) { // conn.commit();
e.printStackTrace(); // } catch (SQLException e) {
} // e.printStackTrace();
} catch (SQLException e) { // }
throw new RuntimeException(e); // } catch (SQLException e) {
} // throw new RuntimeException(e);
// }
return addCount; //
} // return addCount;
// }
//
private void closeConnection(Connection conn) { //
if (conn != null) { // private void closeConnection(Connection conn) {
try { // if (conn != null) {
conn.close(); // try {
} catch (SQLException e) { // conn.close();
e.printStackTrace(); // } catch (SQLException e) {
} // e.printStackTrace();
} // }
} // }
// }
// 第二种 // 第二种
@ -472,7 +478,153 @@ public class ProductServiceImpl implements ProductService {
// private static final Logger log = Logger.getLogger(DataInserter.class.getName()); // private static final Logger log = Logger.getLogger(DataInserter.class.getName());
//第四种
@Override
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
// Long basicId1 = tableInfoDataSources.getBasicId();
Source dataSources = sourceService.getById(basicId);
TableInfo tableInfo = tableInfoService.getById(tableId);
String tableName = tableInfo.getTableName();
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setPoolName("HikariCP 连接池");
hikariConfig.setDriverClassName(dataSources.getDriverName());
// hikariConfig.setJdbcUrl(dataSources.getUrl(dataSources) + "&maxAllowedPacket=1G");
hikariConfig.setJdbcUrl(dataSources.getUrl());
hikariConfig.setUsername(dataSources.getUsername());
hikariConfig.setPassword(dataSources.getPassword());
hikariConfig.setMinimumIdle(2);
hikariConfig.setMaximumPoolSize(10);
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
ExecutorService executorService = Executors.newFixedThreadPool(8);
AtomicInteger addCount = new AtomicInteger();
// 分割数据为较小的批次
List<DataValue[][]> batches = splitData(listList, 3000);
try (Connection conn = hikariDataSource.getConnection()) {
conn.setAutoCommit(false); // 开启事务
for (DataValue[][] batch : batches) {
executorService.submit(() -> {
try (Statement stmt = conn.createStatement()) {
String sql = buildBatchInsertSQL(tableName, batch);
stmt.executeUpdate(sql);
addCount.addAndGet(batch.length);
} catch (SQLException e) {
log.error("SQLException异常发生", e);
try {
conn.rollback(); // 回滚事务
} catch (SQLException ex) {
log.error("回滚事务失败", ex);
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
} catch (Exception e) {
log.error("其他异常发生", e);
try {
conn.rollback(); // 回滚事务
} catch (SQLException ex) {
log.error("回滚事务失败", ex);
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
conn.commit(); // 提交事务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
close(hikariDataSource); // 关闭数据源
}
return addCount.get();
}
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
StringBuilder columns = new StringBuilder("(");
StringBuilder values = new StringBuilder("VALUES ");
// 构建字段名
for (DataValue dataValue : batch[0]) {
String key = dataValue.getKey();
columns.append(key).append(", ");
}
// 删除最后一个逗号和空格
columns.delete(columns.length() - 2, columns.length());
// 构建值部分
for (DataValue[] dataValueList : batch) {
values.append("(");
for (DataValue dataValue : dataValueList) {
Object value = dataValue.getValue();
values.append(formatValue(dataValue.getType(), value)).append(", ");
}
values.delete(values.length() - 2, values.length());
values.append("), ");
}
// 删除最后一个逗号
values.delete(values.length() - 2, values.length());
// 完成 SQL 插入语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
return sql;
}
private String formatValue(DataType type, Object value) {
if (value == null) {
// 根据业务需求处理 null 值
return "NULL"; // 或者其他默认值
}
if (type == DataType.VARCHAR || type == DataType.TEXT) {
return "'" + value.toString().replace("'", "''") + "'";
} else if (type == DataType.BIGINT) {
return value.toString();
} else if (type == DataType.INT) {
return value.toString();
} else if (type == DataType.DECIMAL) {
return value.toString();
} else if (type == DataType.DATETIME) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return "'" + sdf.format((Date) value) + "'";
} else if (type == DataType.DOUBLE) {
return value.toString();
} else {
return "'" + value.toString() + "'";
}
}
// 分割数据为较小的批次
private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
List<DataValue[][]> batches = new ArrayList<>();
int totalRows = listList.length;
int totalBatches = (int) Math.ceil((double) totalRows / batchSize);
for (int i = 0; i < totalBatches; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, totalRows);
DataValue[][] batch = Arrays.copyOfRange(listList, start, end);
batches.add(batch);
}
return batches;
}
// 关闭数据源
private void close(HikariDataSource dataSource) {
dataSource.close();
}