datax模块

master
chenbingxuan 2023-12-14 20:56:11 +08:00
parent 458daa1f83
commit b1b294fb06
2 changed files with 100 additions and 1 deletions

View File

@ -0,0 +1,99 @@
package net.srt.vo;
import lombok.extern.log4j.Log4j2;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Log4j2
public class OptimizedDataGeneration {
public static void main(String[] args) {
String jdbcUrl = "jdbc:mysql://122.51.52.153:3306/jpz01?rewriteBatchedStatements=true&useServerPrepStmts=false";
String username = "root";
String password = "root";
long startTime = System.currentTimeMillis();
try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
connection.setAutoCommit(false); // Disable auto-commit
// Generate and insert data
int numberOfRecords = 30000000;
int batchSize = 60000; // Adjust the batch size as needed
int numberOfThreads = 9; // Adjust the number of threads as needed
// Disable indexes and constraints before inserting data
connection.createStatement().execute("ALTER TABLE t_user DISABLE KEYS");
// Insert data using the optimized method
insertData(connection, numberOfRecords, batchSize, numberOfThreads);
// Enable indexes and constraints after inserting data
connection.createStatement().execute("ALTER TABLE t_user ENABLE KEYS");
connection.commit(); // Commit changes
} catch (SQLException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
long totalTime = System.currentTimeMillis() - startTime;
log.info("Total execution time: {} ms", totalTime);
}
private static void insertData(Connection connection, int numberOfRecords, int batchSize, int numberOfThreads)
throws SQLException, InterruptedException, ExecutionException {
// Example of using ExecutorService for parallel insertion
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
final int startIndex = i * (numberOfRecords / numberOfThreads);
final int endIndex = (i + 1) * (numberOfRecords / numberOfThreads);
Future<Void> future = executorService.submit(() -> {
insertDataBatch(connection, endIndex - startIndex, batchSize, startIndex + 1);
return null;
});
futures.add(future);
}
// Wait for all threads to finish
for (Future<Void> future : futures) {
future.get();
}
executorService.shutdown();
}
private static void insertDataBatch(Connection connection, int numberOfRecords, int batchSize, int startId)
throws SQLException {
String insertQuery = "INSERT INTO t_user (user_id,user_name,user_pwd,user_sex) VALUES (0,?,?,?)";
try (PreparedStatement preparedStatement = connection.prepareStatement(insertQuery)) {
long startTime = System.currentTimeMillis();
for (int i = 0; i < numberOfRecords; i++) {
// Set values for each column in the prepared statement
preparedStatement.setString(1, "a");
preparedStatement.setString(2, "哈");
preparedStatement.setInt(3, 1);
// Add the batch for execution
preparedStatement.addBatch();
// Execute the batch every 'batchSize' records
if ((i + 1) % batchSize == 0 || i == numberOfRecords - 1) {
preparedStatement.executeBatch();
preparedStatement.clearBatch();
}
}
log.info("Insert {} records in {} ms", numberOfRecords, System.currentTimeMillis() - startTime);
}
}
}