修改方法
parent
587f61aab5
commit
9b123a8e5d
|
@ -3,5 +3,5 @@ package com.muyu.basic;
|
||||||
import com.muyu.domain.Source;
|
import com.muyu.domain.Source;
|
||||||
|
|
||||||
public interface SourceConfig {
|
public interface SourceConfig {
|
||||||
public Source queryById(Long id);
|
public Source findSourceById(Long id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,9 +11,9 @@ public class SourceConfigImpl implements SourceConfig {
|
||||||
@Autowired
|
@Autowired
|
||||||
private SourceRemote sourceRemote;
|
private SourceRemote sourceRemote;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Source queryById(Long id) {
|
public Source findSourceById(Long id) {
|
||||||
return sourceRemote.queryById(id).getData();
|
|
||||||
|
return sourceRemote.findSourceById(id).getData();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ public class MySqlDataSource extends BaseDataAbsSource {
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private SourceConfig sourceRemote;
|
public SourceConfig sourceRemote;
|
||||||
// SourceService sourceService = SpringUtils.getBean(SourceService.class);
|
// SourceService sourceService = SpringUtils.getBean(SourceService.class);
|
||||||
|
|
||||||
|
|
||||||
|
@ -359,7 +359,7 @@ public class MySqlDataSource extends BaseDataAbsSource {
|
||||||
Long dataSourceId = query.getDataSourceId();
|
Long dataSourceId = query.getDataSourceId();
|
||||||
ConcurrentHashMap<Integer, DataValue> map = new ConcurrentHashMap<>();
|
ConcurrentHashMap<Integer, DataValue> map = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
Source dataSources = sourceRemote.queryById(dataSourceId);
|
Source dataSources = sourceRemote.findSourceById(dataSourceId);
|
||||||
|
|
||||||
HikariConfig hikariConfig = new HikariConfig();
|
HikariConfig hikariConfig = new HikariConfig();
|
||||||
hikariConfig.setPoolName("HikariCP 连接池");
|
hikariConfig.setPoolName("HikariCP 连接池");
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
com.muyu.mysql.MySqlDataSource
|
||||||
|
com.muyu.basic.impl.SourceConfigImpl
|
|
@ -24,6 +24,12 @@
|
||||||
<artifactId>cloud-common-core</artifactId>
|
<artifactId>cloud-common-core</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.muyu</groupId>
|
||||||
|
<artifactId>cloud-common-etl</artifactId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -8,9 +8,10 @@ import org.springframework.cloud.openfeign.FeignClient;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.PathVariable;
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
|
|
||||||
|
|
||||||
@EnableFeignClients
|
@EnableFeignClients
|
||||||
@FeignClient(value = "cloud-etl-datasources",fallbackFactory = SourceFactory.class )
|
@FeignClient(value = "cloud-source",fallbackFactory = SourceFactory.class )
|
||||||
public interface SourceRemote {
|
public interface SourceRemote {
|
||||||
@GetMapping("datasources/{id}")
|
@GetMapping("/source/findSourceById/{id}")
|
||||||
public Result<Source> queryById(@PathVariable("id") Long id);
|
public Result<Source> findSourceById(@PathVariable("id") Long id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ public class SourceFactory implements FallbackFactory<SourceRemote> {
|
||||||
public SourceRemote create(Throwable cause) {
|
public SourceRemote create(Throwable cause) {
|
||||||
return new SourceRemote() {
|
return new SourceRemote() {
|
||||||
@Override
|
@Override
|
||||||
public Result<Source> queryById(Long id) {
|
public Result<Source> findSourceById(Long id) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
com.muyu.remote.SourceRemote
|
||||||
|
com.muyu.remote.factory.SourceFactory
|
|
@ -48,10 +48,10 @@
|
||||||
<version>2.9.0</version>
|
<version>2.9.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>com.muyu</groupId>
|
<!-- <groupId>com.muyu</groupId>-->
|
||||||
<artifactId>cloud-common-core</artifactId>
|
<!-- <artifactId>cloud-common-core</artifactId>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.muyu</groupId>
|
<groupId>com.muyu</groupId>
|
||||||
|
@ -59,6 +59,13 @@
|
||||||
<version>1.0.0</version>
|
<version>1.0.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.muyu</groupId>
|
||||||
|
<artifactId>cloud-common-etl</artifactId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
<!-- SpringCloud Alibaba Nacos -->
|
<!-- SpringCloud Alibaba Nacos -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -91,10 +98,10 @@
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- MuYu Common DataSource -->
|
<!-- MuYu Common DataSource -->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>com.muyu</groupId>
|
<!-- <groupId>com.muyu</groupId>-->
|
||||||
<artifactId>cloud-common-datasource</artifactId>
|
<!-- <artifactId>cloud-common-datasource</artifactId>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
|
|
||||||
<!-- MuYu Common DataScope -->
|
<!-- MuYu Common DataScope -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -130,9 +137,24 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.muyu</groupId>
|
<groupId>com.muyu</groupId>
|
||||||
<artifactId>cloud-common-datasource</artifactId>
|
<artifactId>cloud-common-datasource</artifactId>
|
||||||
|
<version>3.6.3</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- <dependency>-->
|
<dependency>
|
||||||
|
<groupId>com.muyu</groupId>
|
||||||
|
<artifactId>cloud-etl-client</artifactId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- <dependency>-->
|
||||||
|
<!-- <groupId>com.muyu</groupId>-->
|
||||||
|
<!-- <artifactId>cloud-etl-client</artifactId>-->
|
||||||
|
<!-- <version>1.0.0</version>-->
|
||||||
|
<!-- <scope>compile</scope>-->
|
||||||
|
<!-- </dependency>-->
|
||||||
|
|
||||||
|
<!-- <dependency>-->
|
||||||
<!-- <groupId>com.muyu</groupId>-->
|
<!-- <groupId>com.muyu</groupId>-->
|
||||||
<!-- <artifactId>cloud-common-rabbit</artifactId>-->
|
<!-- <artifactId>cloud-common-rabbit</artifactId>-->
|
||||||
<!-- <version>1.0.0</version>-->
|
<!-- <version>1.0.0</version>-->
|
||||||
|
|
|
@ -51,6 +51,18 @@ public class SourceController extends BaseController {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取远程调用基本信息
|
||||||
|
*/
|
||||||
|
@GetMapping("/findSourceById/{id}")
|
||||||
|
public Result<Source> findSourceById(@PathVariable("id") Long id) {
|
||||||
|
Source source= sourceService.getById(id);
|
||||||
|
return success(source);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 新增基础信息
|
* 新增基础信息
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
package com.muyu.cloud.etl.service.impl;
|
package com.muyu.cloud.etl.service.impl;
|
||||||
|
|
||||||
import com.muyu.Hikari.HikariPool;
|
import com.muyu.Hikari.HikariPool;
|
||||||
import com.muyu.cloud.etl.mysql.MySqlDataSource;
|
|
||||||
import com.muyu.cloud.etl.mysql.MySqlQuery;
|
|
||||||
import com.muyu.cloud.etl.service.DataValueService;
|
import com.muyu.cloud.etl.service.DataValueService;
|
||||||
import com.muyu.cloud.etl.service.SourceService;
|
import com.muyu.cloud.etl.service.SourceService;
|
||||||
import com.muyu.cloud.etl.service.TableInfoService;
|
import com.muyu.cloud.etl.service.TableInfoService;
|
||||||
|
@ -11,6 +9,8 @@ import com.muyu.domain.DataValue;
|
||||||
import com.muyu.domain.Source;
|
import com.muyu.domain.Source;
|
||||||
import com.muyu.domain.TableInfo;
|
import com.muyu.domain.TableInfo;
|
||||||
import com.muyu.domain.enums.DataType;
|
import com.muyu.domain.enums.DataType;
|
||||||
|
import com.muyu.mysql.MySqlDataSource;
|
||||||
|
import com.muyu.mysql.MySqlQuery;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -318,4 +318,5 @@ public class DataValueServiceImpl implements DataValueService {
|
||||||
|
|
||||||
return rows;
|
return rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -496,14 +496,16 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
hikariConfig.setPassword(dataSources.getPassword());
|
hikariConfig.setPassword(dataSources.getPassword());
|
||||||
hikariConfig.setMinimumIdle(2);
|
hikariConfig.setMinimumIdle(2);
|
||||||
hikariConfig.setMaximumPoolSize(10);
|
hikariConfig.setMaximumPoolSize(10);
|
||||||
|
hikariConfig.setMaxLifetime(300000); // 5 minutes
|
||||||
|
hikariConfig.setConnectionTimeout(30000); // 30 seconds
|
||||||
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
|
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
|
||||||
|
|
||||||
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
|
// HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources);
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
ExecutorService executorService = Executors.newFixedThreadPool(6);
|
||||||
AtomicInteger addCount = new AtomicInteger();
|
AtomicInteger addCount = new AtomicInteger();
|
||||||
|
|
||||||
// 分割数据为较小的批次
|
// 分割数据为较小的批次
|
||||||
List<DataValue[][]> batches = splitData(listList, 3000);
|
List<DataValue[][]> batches = splitData(listList, 5000);
|
||||||
|
|
||||||
try (Connection conn = hikariDataSource.getConnection()) {
|
try (Connection conn = hikariDataSource.getConnection()) {
|
||||||
conn.setAutoCommit(false); // 开启事务
|
conn.setAutoCommit(false); // 开启事务
|
||||||
|
@ -537,7 +539,11 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
executorService.awaitTermination(1, TimeUnit.HOURS);
|
if (!executorService.awaitTermination(1, TimeUnit.HOURS)) {
|
||||||
|
log.warn("Executor service did not terminate within the timeout.");
|
||||||
|
executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
// executorService.awaitTermination(1, TimeUnit.HOURS);
|
||||||
conn.commit(); // 提交事务
|
conn.commit(); // 提交事务
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -554,7 +560,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
|
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
|
||||||
StringBuilder columns = new StringBuilder("(");
|
StringBuilder columns = new StringBuilder("(");
|
||||||
StringBuilder values = new StringBuilder("VALUES ");
|
StringBuilder values = new StringBuilder("VALUES ");
|
||||||
|
|
||||||
// 构建字段名
|
// 构建字段名
|
||||||
for (DataValue dataValue : batch[0]) {
|
for (DataValue dataValue : batch[0]) {
|
||||||
String key = dataValue.getKey();
|
String key = dataValue.getKey();
|
||||||
|
@ -562,7 +567,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
}
|
}
|
||||||
// 删除最后一个逗号和空格
|
// 删除最后一个逗号和空格
|
||||||
columns.delete(columns.length() - 2, columns.length());
|
columns.delete(columns.length() - 2, columns.length());
|
||||||
|
|
||||||
// 构建值部分
|
// 构建值部分
|
||||||
for (DataValue[] dataValueList : batch) {
|
for (DataValue[] dataValueList : batch) {
|
||||||
values.append("(");
|
values.append("(");
|
||||||
|
@ -575,7 +579,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
}
|
}
|
||||||
// 删除最后一个逗号
|
// 删除最后一个逗号
|
||||||
values.delete(values.length() - 2, values.length());
|
values.delete(values.length() - 2, values.length());
|
||||||
|
|
||||||
// 完成 SQL 插入语句
|
// 完成 SQL 插入语句
|
||||||
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
|
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
|
||||||
return sql;
|
return sql;
|
||||||
|
@ -586,7 +589,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
// 根据业务需求处理 null 值
|
// 根据业务需求处理 null 值
|
||||||
return "NULL"; // 或者其他默认值
|
return "NULL"; // 或者其他默认值
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type == DataType.VARCHAR || type == DataType.TEXT) {
|
if (type == DataType.VARCHAR || type == DataType.TEXT) {
|
||||||
return "'" + value.toString().replace("'", "''") + "'";
|
return "'" + value.toString().replace("'", "''") + "'";
|
||||||
} else if (type == DataType.BIGINT) {
|
} else if (type == DataType.BIGINT) {
|
||||||
|
@ -617,10 +619,8 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) {
|
||||||
DataValue[][] batch = Arrays.copyOfRange(listList, start, end);
|
DataValue[][] batch = Arrays.copyOfRange(listList, start, end);
|
||||||
batches.add(batch);
|
batches.add(batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
return batches;
|
return batches;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 关闭数据源
|
// 关闭数据源
|
||||||
private void close(HikariDataSource dataSource) {
|
private void close(HikariDataSource dataSource) {
|
||||||
dataSource.close();
|
dataSource.close();
|
||||||
|
|
|
@ -10,6 +10,8 @@ nacos:
|
||||||
namespace: cloud-2112
|
namespace: cloud-2112
|
||||||
# Spring
|
# Spring
|
||||||
spring:
|
spring:
|
||||||
|
main:
|
||||||
|
allow-bean-definition-overriding: true
|
||||||
|
|
||||||
application:
|
application:
|
||||||
# 应用名称
|
# 应用名称
|
||||||
|
|
Loading…
Reference in New Issue