修改查询语句
parent
886e768c36
commit
0737286f64
|
@ -5,9 +5,6 @@ target/
|
||||||
|
|
||||||
### IntelliJ IDEA ###
|
### IntelliJ IDEA ###
|
||||||
.idea
|
.idea
|
||||||
*.iws
|
|
||||||
*.iml
|
|
||||||
*.ipr
|
|
||||||
|
|
||||||
### Eclipse ###
|
### Eclipse ###
|
||||||
.apt_generated
|
.apt_generated
|
||||||
|
|
|
@ -99,6 +99,7 @@ public class MySqlDataSource extends BaseDataAbsSource {
|
||||||
// throw new RuntimeException(e);
|
// throw new RuntimeException(e);
|
||||||
// }
|
// }
|
||||||
log.info("one------------"+one+"------------two"+two);
|
log.info("one------------"+one+"------------two"+two);
|
||||||
|
|
||||||
DataValue[][] dataValues = new DataValue[one][two];
|
DataValue[][] dataValues = new DataValue[one][two];
|
||||||
try {
|
try {
|
||||||
conn = hikariDataSource.getConnection();
|
conn = hikariDataSource.getConnection();
|
||||||
|
@ -142,9 +143,9 @@ public class MySqlDataSource extends BaseDataAbsSource {
|
||||||
.type(map.get(i).getType())
|
.type(map.get(i).getType())
|
||||||
.build();
|
.build();
|
||||||
dataValues[c][i-1]=build;
|
dataValues[c][i-1]=build;
|
||||||
if (c <= one && i <= columnCount) {
|
// if (c <= one && i <= columnCount) {
|
||||||
dataValues[c - 1][i - 1] = build;
|
// dataValues[c - 1][i - 1] = build;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if (c <= one) {
|
// if (c <= one) {
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
package com.muyu.cloud.etl.service.impl;
|
package com.muyu.cloud.etl.service.impl;
|
||||||
|
|
||||||
|
import com.muyu.Hikari.HikariPool;
|
||||||
import com.muyu.cloud.etl.service.ProductService;
|
import com.muyu.cloud.etl.service.ProductService;
|
||||||
import com.muyu.cloud.etl.service.SourceService;
|
import com.muyu.cloud.etl.service.SourceService;
|
||||||
import com.muyu.cloud.etl.service.TableInfoService;
|
import com.muyu.cloud.etl.service.TableInfoService;
|
||||||
import com.muyu.domain.DataValue;
|
import com.muyu.domain.DataValue;
|
||||||
import com.muyu.domain.Source;
|
import com.muyu.domain.Source;
|
||||||
import com.muyu.domain.TableInfo;
|
import com.muyu.domain.TableInfo;
|
||||||
import com.zaxxer.hikari.HikariConfig;
|
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -31,25 +31,14 @@ public class ProductServiceImpl implements ProductService {
|
||||||
|
|
||||||
|
|
||||||
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
// TableInfo tableInfoDataSources = tableInfoService.getById(basicId);
|
|
||||||
// Long basicId1 = tableInfoDataSources.getBasicId();
|
|
||||||
Source dataSources = sourceService.getById(basicId);
|
Source dataSources = sourceService.getById(basicId);
|
||||||
TableInfo tableInfo = tableInfoService.getById(tableId);
|
TableInfo tableInfo = tableInfoService.getById(tableId);
|
||||||
String tableName = tableInfo.getTableName();
|
String tableName = tableInfo.getTableName();
|
||||||
|
|
||||||
// HikariDataSource hikariDataSource = getHikariDataSource(dataSources);
|
|
||||||
|
|
||||||
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
|
|
||||||
|
|
||||||
HikariConfig hikariConfig = new HikariConfig();
|
HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
|
||||||
hikariConfig.setPoolName("HikariCP 连接池");
|
|
||||||
hikariConfig.setDriverClassName(dataSources.getDriverName());
|
|
||||||
hikariConfig.setJdbcUrl(dataSources.getUrl());
|
|
||||||
hikariConfig.setUsername(dataSources.getUsername());
|
|
||||||
hikariConfig.setPassword(dataSources.getPassword());
|
|
||||||
hikariConfig.setMinimumIdle(2);
|
|
||||||
hikariConfig.setMaximumPoolSize(10);
|
|
||||||
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
|
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||||
|
|
||||||
AtomicInteger addCount = new AtomicInteger();
|
AtomicInteger addCount = new AtomicInteger();
|
||||||
|
@ -58,14 +47,14 @@ public class ProductServiceImpl implements ProductService {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
connection = hikariDataSource.getConnection();
|
connection = hikariDataSource.getConnection();
|
||||||
// 遍历外部列表
|
|
||||||
for (DataValue[] dataValueList : listList) {
|
for (DataValue[] dataValueList : listList) {
|
||||||
Connection finalConnection = connection;
|
Connection finalConnection = connection;
|
||||||
executorService.submit(() -> {
|
executorService.submit(() -> {
|
||||||
try {
|
try {
|
||||||
addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList));
|
addCount.addAndGet(insertRow(finalConnection, tableName, dataValueList));
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
// 记录异常
|
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -88,21 +77,21 @@ public class ProductServiceImpl implements ProductService {
|
||||||
|
|
||||||
public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
|
public int insertRow(Connection conn, String tableName, DataValue[] dataValueList) throws SQLException {
|
||||||
|
|
||||||
// 获取当前行的所有字段名
|
|
||||||
StringBuilder columns = new StringBuilder("(");
|
StringBuilder columns = new StringBuilder("(");
|
||||||
StringBuilder values = new StringBuilder("VALUES (");
|
StringBuilder values = new StringBuilder("VALUES (");
|
||||||
|
|
||||||
// 构建字段名和占位符
|
|
||||||
for (DataValue dataValue : dataValueList) {
|
for (DataValue dataValue : dataValueList) {
|
||||||
String key = dataValue.getKey();
|
String key = dataValue.getKey();
|
||||||
columns.append(key).append(", ");
|
columns.append(key).append(", ");
|
||||||
values.append("?, ");
|
values.append("?, ");
|
||||||
}
|
}
|
||||||
// 删除最后一个逗号和空格
|
|
||||||
columns.delete(columns.length() - 2, columns.length());
|
columns.delete(columns.length() - 2, columns.length());
|
||||||
values.delete(values.length() - 2, values.length());
|
values.delete(values.length() - 2, values.length());
|
||||||
|
|
||||||
// 完成 SQL 插入语句
|
|
||||||
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
|
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ")";
|
||||||
log.info("添加语句{}",sql);
|
log.info("添加语句{}",sql);
|
||||||
|
|
||||||
|
@ -112,8 +101,7 @@ public class ProductServiceImpl implements ProductService {
|
||||||
conn.setAutoCommit(false);
|
conn.setAutoCommit(false);
|
||||||
try (
|
try (
|
||||||
PreparedStatement ps = conn.prepareStatement(sql)) {
|
PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||||
// 循环设置参数并执行插入
|
|
||||||
// for (DataValue dataValue : dataValueList) {
|
|
||||||
int index = 1;
|
int index = 1;
|
||||||
for (DataValue value : dataValueList) {
|
for (DataValue value : dataValueList) {
|
||||||
Object obj = value.getValue();
|
Object obj = value.getValue();
|
||||||
|
@ -144,9 +132,7 @@ public class ProductServiceImpl implements ProductService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ps.addBatch();
|
ps.addBatch();
|
||||||
// }
|
|
||||||
|
|
||||||
// 执行批量插入操作
|
|
||||||
int[] ints = ps.executeBatch();
|
int[] ints = ps.executeBatch();
|
||||||
|
|
||||||
for (int anInt : ints) {
|
for (int anInt : ints) {
|
||||||
|
@ -175,6 +161,7 @@ public class ProductServiceImpl implements ProductService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// 第二种
|
// 第二种
|
||||||
// @Override
|
// @Override
|
||||||
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
|
@ -347,161 +334,147 @@ public class ProductServiceImpl implements ProductService {
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
// 第四个方法
|
||||||
|
|
||||||
//第三种
|
// @Override
|
||||||
|
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
//
|
//
|
||||||
// public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
// Source dataSources = sourceService.getById(basicId);
|
||||||
|
// TableInfo tableInfo = tableInfoService.getById(tableId);
|
||||||
|
// String tableName = tableInfo.getTableName();
|
||||||
//
|
//
|
||||||
// log.info("数据库主键ID"+basicId);
|
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
|
||||||
// log.info("表的主键ID"+tableId);
|
|
||||||
// Source dataSources = sourceService.getById(basicId);
|
|
||||||
// TableInfo tableInfo = tableInfoService.getById(tableId);
|
|
||||||
// String tableName = tableInfo.getTableName();
|
|
||||||
//
|
//
|
||||||
// HikariConfig hikariConfig = new HikariConfig();
|
// ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||||
// hikariConfig.setPoolName("HikariCP 连接池");
|
// AtomicInteger addCount = new AtomicInteger();
|
||||||
// hikariConfig.setDriverClassName(dataSources.getDriverName());
|
|
||||||
// hikariConfig.setJdbcUrl(dataSources.getUrl());
|
|
||||||
// hikariConfig.setUsername(dataSources.getUsername());
|
|
||||||
// hikariConfig.setPassword(dataSources.getPassword());
|
|
||||||
// hikariConfig.setMinimumIdle(2);
|
|
||||||
// hikariConfig.setMaximumPoolSize(10);
|
|
||||||
//
|
//
|
||||||
// HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
|
|
||||||
//
|
//
|
||||||
// log.info("数据源ID的basicId值: {}", basicId);
|
// List<DataValue[][]> batches = splitData(listList, 1000);
|
||||||
// log.info("表的主键值: {}", tableId);
|
|
||||||
// for (DataValue[] dataValues : listList) {
|
|
||||||
// for (DataValue dataValue : dataValues) {
|
|
||||||
// log.info("里面的所有的值: {}", dataValue);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
//
|
||||||
// ExecutorService executorService = Executors.newFixedThreadPool(8);
|
// try (Connection conn = hikariDataSource.getConnection()) {
|
||||||
// AtomicInteger addCount = new AtomicInteger();
|
// conn.setAutoCommit(false); // 开启事务
|
||||||
//
|
//
|
||||||
// // 分割数据为3000个批次
|
// for (DataValue[][] batch : batches) {
|
||||||
// List<DataValue[][]> batches = splitData(listList, 20);
|
// executorService.submit(() -> {
|
||||||
|
// try (Statement stmt = conn.createStatement()) {
|
||||||
//
|
//
|
||||||
// try (Connection conn = hikariDataSource.getConnection()) {
|
// String sql = buildBatchInsertSQL(tableName, batch);
|
||||||
// conn.setAutoCommit(false); // 开启事务
|
// stmt.executeUpdate(sql);
|
||||||
//
|
// addCount.addAndGet(batch.length);
|
||||||
// for (DataValue[][] batch : batches) {
|
// } catch (SQLException e) {
|
||||||
// log.info("数量batch: {}", batch.length);
|
// log.error("SQLException异常发生", e);
|
||||||
//
|
// try {
|
||||||
// executorService.submit(() -> {
|
// conn.rollback(); // 回滚事务
|
||||||
// try (PreparedStatement stmt = conn.prepareStatement(buildBatchInsertSQL(tableName, batch))) {
|
// } catch (SQLException ex) {
|
||||||
// int index = 1;
|
// log.error("回滚事务失败", ex);
|
||||||
// for (DataValue[] dataValues : batch) {
|
// throw new RuntimeException(ex);
|
||||||
// for (DataValue dataValue : dataValues) {
|
|
||||||
// stmt.setObject(index++, dataValue.getValue());
|
|
||||||
// }
|
|
||||||
// stmt.addBatch();
|
|
||||||
// }
|
|
||||||
// stmt.executeBatch();
|
|
||||||
// addCount.addAndGet(batch.length);
|
|
||||||
// } catch (SQLException e) {
|
|
||||||
// log.error("SQLException异常发生", e);
|
|
||||||
// try {
|
|
||||||
// conn.rollback(); // 回滚事务
|
|
||||||
// } catch (SQLException ex) {
|
|
||||||
// log.error("回滚事务失败", ex);
|
|
||||||
// throw new RuntimeException(ex);
|
|
||||||
// }
|
|
||||||
// throw new RuntimeException(e);
|
|
||||||
// }
|
// }
|
||||||
// });
|
// throw new RuntimeException(e);
|
||||||
// }
|
|
||||||
//
|
|
||||||
// executorService.shutdown();
|
|
||||||
// executorService.awaitTermination(1, TimeUnit.HOURS);
|
|
||||||
// conn.commit(); // 提交事务
|
|
||||||
// } catch (InterruptedException e) {
|
|
||||||
// Thread.currentThread().interrupt();
|
|
||||||
// throw new RuntimeException(e);
|
|
||||||
// } catch (SQLException e) {
|
|
||||||
// throw new RuntimeException(e);
|
|
||||||
// } finally {
|
|
||||||
// close(hikariDataSource); // 关闭数据源
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return addCount.get();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
|
|
||||||
// StringBuilder columns = new StringBuilder("(");
|
|
||||||
// StringBuilder values = new StringBuilder("VALUES ");
|
|
||||||
//
|
|
||||||
// // 构建字段名
|
|
||||||
// for (DataValue dataValue : batch[0]) {
|
|
||||||
// String key = dataValue.getKey();
|
|
||||||
// columns.append(key).append(", ");
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // 删除最后一个逗号和空格
|
|
||||||
// columns.delete(columns.length() - 2, columns.length());
|
|
||||||
// // 构建值部分
|
|
||||||
// for (DataValue[] dataValueList : batch) {
|
|
||||||
// values.append("(");
|
|
||||||
// for (int i = 0; i < dataValueList.length; i++) {
|
|
||||||
// if (i > 0) {
|
|
||||||
// values.append(", ");
|
|
||||||
// }
|
// }
|
||||||
// values.append("?"); // 使用占位符
|
// });
|
||||||
// }
|
|
||||||
// values.append("), ");
|
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
|
// executorService.shutdown();
|
||||||
|
// executorService.awaitTermination(1, TimeUnit.HOURS);
|
||||||
|
// conn.commit();
|
||||||
|
// } catch (InterruptedException e) {
|
||||||
|
// Thread.currentThread().interrupt();
|
||||||
|
// throw new RuntimeException(e);
|
||||||
|
// } catch (SQLException e) {
|
||||||
|
// throw new RuntimeException(e);
|
||||||
|
// } finally {
|
||||||
|
// close(hikariDataSource);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return addCount.get();
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
|
||||||
|
// StringBuilder columns = new StringBuilder("(");
|
||||||
|
// StringBuilder values = new StringBuilder("VALUES ");
|
||||||
|
//
|
||||||
|
// for (DataValue dataValue : batch[0]) {
|
||||||
|
// String key = dataValue.getKey();
|
||||||
|
// columns.append(key).append(", ");
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// columns.delete(columns.length() - 2, columns.length());
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// log.info("开始拼接");
|
||||||
|
//
|
||||||
|
// for (DataValue[] dataValueList : batch) {
|
||||||
|
// values.append("(");
|
||||||
|
// for (DataValue dataValue : dataValueList) {
|
||||||
|
// Object value = dataValue.getValue();
|
||||||
|
// values.append(formatValue(dataValue.getType(), value)).append(", ");
|
||||||
|
// }
|
||||||
|
// log.info("开始拼接2");
|
||||||
// values.delete(values.length() - 2, values.length());
|
// values.delete(values.length() - 2, values.length());
|
||||||
// // 完成 SQL 插入语句
|
// values.append("), ");
|
||||||
// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString() + ";";
|
// }
|
||||||
|
// log.info("开始拼接3");
|
||||||
|
// values.delete(values.length() - 2, values.length());
|
||||||
|
// log.info("开始拼接4");
|
||||||
//
|
//
|
||||||
// log.info("执行的sql添加的sql语句: {}", sql);
|
// String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
|
||||||
// return sql;
|
// return sql;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// private String formatValue(DataType type, Object value) {
|
||||||
|
// if (type == DataType.VARCHAR || type == DataType.TEXT) {
|
||||||
|
//
|
||||||
|
// return "'" + value.toString().replace("'", "''") + "'";
|
||||||
|
// } else if (type == DataType.BIGINT) {
|
||||||
|
//
|
||||||
|
// return value.toString();
|
||||||
|
// } else if (type == DataType.INT) {
|
||||||
|
//
|
||||||
|
// return value.toString();
|
||||||
|
// } else if (type == DataType.DECIMAL) {
|
||||||
|
//
|
||||||
|
// return value.toString();
|
||||||
|
// } else if (type == DataType.DATETIME) {
|
||||||
|
//
|
||||||
|
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
|
// return "'" + sdf.format((Date) value) + "'";
|
||||||
|
// } else if (type == DataType.DOUBLE) {
|
||||||
|
//
|
||||||
|
// return value.toString();
|
||||||
|
// } else {
|
||||||
|
//
|
||||||
|
// return "'" + value.toString() + "'";
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
|
||||||
|
// List<DataValue[][]> batches = new ArrayList<>();
|
||||||
|
// int totalSize = listList.length;
|
||||||
|
// int numBatches = (int) Math.ceil((double) totalSize / batchSize);
|
||||||
|
//
|
||||||
|
// for (int i = 0; i < numBatches; i++) {
|
||||||
|
// int start = i * batchSize;
|
||||||
|
// int end = Math.min(start + batchSize, totalSize);
|
||||||
|
// DataValue[][] batch = new DataValue[end - start][];
|
||||||
|
// System.arraycopy(listList, start, batch, 0, end - start);
|
||||||
|
// batches.add(batch);
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// private String formatValue(DataType type, Object value) {
|
// return batches;
|
||||||
// if (type == DataType.VARCHAR || type == DataType.TEXT) {
|
// }
|
||||||
// return "'" + value.toString().replace("'", "''") + "'";
|
//
|
||||||
// } else if (type == DataType.BIGINT) {
|
// private static void close(HikariDataSource dataSource) {
|
||||||
// return value.toString();
|
// if (dataSource != null) {
|
||||||
// } else if (type == DataType.INT) {
|
// dataSource.close();
|
||||||
// return value.toString();
|
|
||||||
// } else if (type == DataType.DECIMAL) {
|
|
||||||
// return value.toString();
|
|
||||||
// } else if (type == DataType.DATETIME) {
|
|
||||||
// return "'" + new java.sql.Date(((Date) value).getTime()) + "'";
|
|
||||||
// } else if (type == DataType.DOUBLE) {
|
|
||||||
// return value.toString();
|
|
||||||
// } else {
|
|
||||||
// // 其他类型的处理
|
|
||||||
// return "'" + value.toString() + "'";
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
|
|
||||||
// List<DataValue[][]> batches = new ArrayList<>();
|
|
||||||
// int totalSize = listList.length;
|
|
||||||
// int numBatches = (int) Math.ceil((double) totalSize / batchSize);
|
|
||||||
//
|
|
||||||
// for (int i = 0; i < numBatches; i++) {
|
|
||||||
// int start = i * batchSize;
|
|
||||||
// int end = Math.min(start + batchSize, totalSize);
|
|
||||||
// DataValue[][] batch = new DataValue[end - start][];
|
|
||||||
// System.arraycopy(listList, start, batch, 0, end - start);
|
|
||||||
// batches.add(batch);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// log.info("");
|
|
||||||
// return batches;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private static void close(HikariDataSource dataSource) {
|
|
||||||
// if (dataSource != null) {
|
|
||||||
// dataSource.close();
|
|
||||||
// }
|
|
||||||
// }
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// private static final Logger log = Logger.getLogger(DataInserter.class.getName());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue