修改方法
parent
d494452a15
commit
68d928ab4d
|
@ -1,7 +0,0 @@
|
|||
package com.muyu;
|
||||
|
||||
public class Main {
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Hello world!");
|
||||
}
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
package com.muyu.domain;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.muyu.common.core.web.domain.BaseEntity;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@SuperBuilder
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@TableName(value ="sourcetype",autoResultMap = true) //数据库表相关
|
||||
public class SourceType extends BaseEntity {
|
||||
private static final long serialVersionUID = 1L;
|
||||
//数据源类型ID
|
||||
private Integer id;
|
||||
|
||||
//数据源类型名称
|
||||
private String name;
|
||||
//
|
||||
@TableField(value = "driver_class")
|
||||
private String driverClass;
|
||||
|
||||
//
|
||||
private String prefix;
|
||||
|
||||
private String suffix;
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
package com.muyu;
|
||||
|
||||
public class Main {
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Hello world!");
|
||||
}
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
package com.muyu.cloud.etl.controller;
|
||||
|
||||
|
||||
import com.muyu.cloud.etl.service.SourceTypeService;
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.common.core.web.controller.BaseController;
|
||||
import com.muyu.domain.SourceType;
|
||||
import com.muyu.domain.mysql.config.MysqlPoolConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/SourceType")
|
||||
public class SourceTypeController extends BaseController {
|
||||
@Autowired
|
||||
private SourceTypeService sourceTypeService;
|
||||
|
||||
//查询
|
||||
@GetMapping("/findSourceType")
|
||||
public Result<List<SourceType>> findSourceType() {
|
||||
List<SourceType> sourceTypeList=sourceTypeService.findSourceType();
|
||||
return Result.success(sourceTypeList);
|
||||
}
|
||||
|
||||
// 数据源连接池测试连接
|
||||
@PostMapping("TextSourcePool")
|
||||
public Boolean TextSourcePool(@RequestBody MysqlPoolConfig mysqlPoolConfig) {
|
||||
return sourceTypeService.TextSourcePool(mysqlPoolConfig);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,16 +0,0 @@
|
|||
package com.muyu.cloud.etl.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.muyu.domain.SourceType;
|
||||
import com.muyu.domain.mysql.config.MysqlPoolConfig;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public interface SourceTypeService extends IService<SourceType> {
|
||||
|
||||
List<SourceType> findSourceType();
|
||||
|
||||
Boolean TextSourcePool(MysqlPoolConfig mysqlPoolConfig);
|
||||
|
||||
}
|
|
@ -1,146 +0,0 @@
|
|||
package com.muyu.cloud.etl.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.muyu.cloud.etl.mapper.SourceTypeMapper;
|
||||
import com.muyu.cloud.etl.service.SourceTypeService;
|
||||
import com.muyu.domain.SourceType;
|
||||
import com.muyu.domain.Structure;
|
||||
import com.muyu.domain.mysql.MysqlPool;
|
||||
import com.muyu.domain.mysql.config.MysqlPoolConfig;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Log4j2
|
||||
@Service
|
||||
public class SourceTypeServiceImpl extends ServiceImpl<SourceTypeMapper, SourceType> implements SourceTypeService {
|
||||
//数据源连接池线程
|
||||
@Override
|
||||
public List<SourceType> findSourceType() {
|
||||
LambdaQueryWrapper<SourceType> sourceTypeLambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
return this.list(sourceTypeLambdaQueryWrapper);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private static final int BATCH_SIZE = 100; // 定义每个批次的表数量
|
||||
|
||||
public Boolean TextSourcePool(MysqlPoolConfig mysqlPoolConfig) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<Structure> structureArrayList = new CopyOnWriteArrayList<>(); // 使用线程安全的集合
|
||||
|
||||
MysqlPool mysqlPool = new MysqlPool(mysqlPoolConfig);
|
||||
mysqlPool.init();
|
||||
Connection conn = mysqlPool.getConn();
|
||||
|
||||
DatabaseMetaData metaData = null;
|
||||
String databaseName = mysqlPoolConfig.getDatabaseName();
|
||||
|
||||
try {
|
||||
metaData = conn.getMetaData();
|
||||
ResultSet rs = metaData.getTables(databaseName, null, "%", new String[]{"TABLE", "VIEW"});
|
||||
|
||||
List<String> tableNames = new ArrayList<>();
|
||||
while (rs.next()) {
|
||||
tableNames.add(rs.getString("TABLE_NAME"));
|
||||
}
|
||||
|
||||
int totalTables = tableNames.size();
|
||||
int totalPages = (int) Math.ceil((double) totalTables / BATCH_SIZE);
|
||||
|
||||
ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 创建一个固定大小的线程池
|
||||
|
||||
for (int i = 0; i < totalPages; i++) {
|
||||
int start = i * BATCH_SIZE;
|
||||
int end = Math.min(start + BATCH_SIZE, totalTables);
|
||||
|
||||
List<String> batchTableNames = tableNames.subList(start, end);
|
||||
|
||||
threadPool.submit(() -> {
|
||||
try {
|
||||
for (String tableName : batchTableNames) {
|
||||
PreparedStatement ps = conn.prepareStatement(
|
||||
"SELECT " +
|
||||
"COLUMN_NAME, " +
|
||||
"COLUMN_COMMENT, " +
|
||||
"CASE WHEN COLUMN_KEY = 'PRI' THEN '是' ELSE '否' END, " +
|
||||
"CASE " +
|
||||
"WHEN DATA_TYPE = 'int' THEN 'Integer' " +
|
||||
"WHEN DATA_TYPE = 'bigint' THEN 'Long' " +
|
||||
"WHEN DATA_TYPE = 'varchar' THEN 'String' " +
|
||||
"WHEN DATA_TYPE = 'decimal' THEN 'BigDecimal' " +
|
||||
"WHEN DATA_TYPE = 'tinyint' AND COLUMN_TYPE = 'tinyint(1)' THEN 'Boolean' " +
|
||||
"ELSE DATA_TYPE " +
|
||||
"END AS javaType, " +
|
||||
"DATA_TYPE, " +
|
||||
"COLUMN_TYPE, " +
|
||||
"CHARACTER_MAXIMUM_LENGTH, " +
|
||||
"NUMERIC_SCALE, " +
|
||||
"IS_NULLABLE, " +
|
||||
"COLUMN_DEFAULT " +
|
||||
"FROM INFORMATION_SCHEMA.COLUMNS " +
|
||||
"WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
|
||||
);
|
||||
ps.setString(1, databaseName);
|
||||
ps.setString(2, tableName);
|
||||
|
||||
ResultSet resultSet = ps.executeQuery();
|
||||
while (resultSet.next()) {
|
||||
Structure build = Structure.builder()
|
||||
.columnName(resultSet.getString("COLUMN_NAME"))
|
||||
.columnRemark(resultSet.getString("COLUMN_COMMENT"))
|
||||
.isPrimary("是".equals(resultSet.getString("CASE WHEN COLUMN_KEY = 'PRI' THEN '是' ELSE '否' END")) ? "Y" : "N")
|
||||
.javaType(resultSet.getString("javaType"))
|
||||
.columnType(resultSet.getString("DATA_TYPE"))
|
||||
.columnType(resultSet.getString("COLUMN_TYPE"))
|
||||
.columnLength(resultSet.getString("CHARACTER_MAXIMUM_LENGTH"))
|
||||
.columnDecimals(resultSet.getString("NUMERIC_SCALE"))
|
||||
.isNull("YES".equals(resultSet.getString("IS_NULLABLE")) ? "Y" : "N")
|
||||
.defaultValue(resultSet.getString("COLUMN_DEFAULT"))
|
||||
.build();
|
||||
|
||||
structureArrayList.add(build);
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
threadPool.shutdown();
|
||||
threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待所有线程完成
|
||||
|
||||
} catch (SQLException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
try {
|
||||
if (conn != null) {
|
||||
conn.close();
|
||||
mysqlPool.replease(conn);
|
||||
mysqlPool.closeBaseConn();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
double totalTimeInMinutes = (endTime - startTime) / 60000.0;
|
||||
System.out.println("程序总体的执行时间:" + totalTimeInMinutes + " 分钟");
|
||||
|
||||
log.info("接入的数据是"+structureArrayList.toString());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue