From 9b123a8e5d81493d29ce3757734eebb76b090a76 Mon Sep 17 00:00:00 2001 From: lwj <3529558005@qq.com> Date: Tue, 10 Sep 2024 11:03:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/muyu/basic/SourceConfig.java | 2 +- .../com/muyu/basic/impl/SourceConfigImpl.java | 6 +-- .../java/com/muyu/mysql/MySqlDataSource.java | 4 +- ...ot.autoconfigure.AutoConfiguration.imports | 2 + cloud-etl-common/pom.xml | 6 +++ .../java/com/muyu/remote/SourceRemote.java | 7 ++-- .../muyu/remote/factory/SourceFactory.java | 2 +- ...ot.autoconfigure.AutoConfiguration.imports | 2 + cloud-etl-server/pom.xml | 40 ++++++++++++++----- .../etl/controller/SourceController.java | 12 ++++++ .../service/impl/DataValueServiceImpl.java | 5 ++- .../etl/service/impl/ProductServiceImpl.java | 18 ++++----- .../src/main/resources/bootstrap.yml | 2 + 13 files changed, 78 insertions(+), 30 deletions(-) create mode 100644 cloud-etl-client/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 cloud-etl-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/cloud-etl-client/src/main/java/com/muyu/basic/SourceConfig.java b/cloud-etl-client/src/main/java/com/muyu/basic/SourceConfig.java index 009cfe2..cf2154a 100644 --- a/cloud-etl-client/src/main/java/com/muyu/basic/SourceConfig.java +++ b/cloud-etl-client/src/main/java/com/muyu/basic/SourceConfig.java @@ -3,5 +3,5 @@ package com.muyu.basic; import com.muyu.domain.Source; public interface SourceConfig { - public Source queryById(Long id); + public Source findSourceById(Long id); } diff --git a/cloud-etl-client/src/main/java/com/muyu/basic/impl/SourceConfigImpl.java b/cloud-etl-client/src/main/java/com/muyu/basic/impl/SourceConfigImpl.java index e1847fb..481d61c 100644 --- a/cloud-etl-client/src/main/java/com/muyu/basic/impl/SourceConfigImpl.java +++ b/cloud-etl-client/src/main/java/com/muyu/basic/impl/SourceConfigImpl.java @@ -11,9 +11,9 @@ public class SourceConfigImpl implements SourceConfig { @Autowired private SourceRemote sourceRemote; - @Override - public Source queryById(Long id) { - return sourceRemote.queryById(id).getData(); + public Source findSourceById(Long id) { + + return sourceRemote.findSourceById(id).getData(); } } diff --git a/cloud-etl-client/src/main/java/com/muyu/mysql/MySqlDataSource.java b/cloud-etl-client/src/main/java/com/muyu/mysql/MySqlDataSource.java index c2c0448..b4fc036 100644 --- a/cloud-etl-client/src/main/java/com/muyu/mysql/MySqlDataSource.java +++ b/cloud-etl-client/src/main/java/com/muyu/mysql/MySqlDataSource.java @@ -22,7 +22,7 @@ public class MySqlDataSource extends BaseDataAbsSource { @Autowired - private SourceConfig sourceRemote; + public SourceConfig sourceRemote; // SourceService sourceService = SpringUtils.getBean(SourceService.class); @@ -359,7 +359,7 @@ public class MySqlDataSource extends BaseDataAbsSource { Long dataSourceId = query.getDataSourceId(); ConcurrentHashMap map = new ConcurrentHashMap<>(); - Source dataSources = sourceRemote.queryById(dataSourceId); + Source dataSources = sourceRemote.findSourceById(dataSourceId); HikariConfig hikariConfig = new HikariConfig(); hikariConfig.setPoolName("HikariCP 连接池"); diff --git a/cloud-etl-client/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-etl-client/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..4d21843 --- /dev/null +++ b/cloud-etl-client/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,2 @@ +com.muyu.mysql.MySqlDataSource +com.muyu.basic.impl.SourceConfigImpl diff --git a/cloud-etl-common/pom.xml b/cloud-etl-common/pom.xml index 9b8c0ad..4f0e5c4 100644 --- a/cloud-etl-common/pom.xml +++ b/cloud-etl-common/pom.xml @@ -24,6 +24,12 @@ cloud-common-core + + com.muyu + cloud-common-etl + 1.0.0 + + diff --git a/cloud-etl-remote/src/main/java/com/muyu/remote/SourceRemote.java b/cloud-etl-remote/src/main/java/com/muyu/remote/SourceRemote.java index c2a413d..90d3db7 100644 --- a/cloud-etl-remote/src/main/java/com/muyu/remote/SourceRemote.java +++ b/cloud-etl-remote/src/main/java/com/muyu/remote/SourceRemote.java @@ -8,9 +8,10 @@ import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; + @EnableFeignClients -@FeignClient(value = "cloud-etl-datasources",fallbackFactory = SourceFactory.class ) +@FeignClient(value = "cloud-source",fallbackFactory = SourceFactory.class ) public interface SourceRemote { - @GetMapping("datasources/{id}") - public Result queryById(@PathVariable("id") Long id); + @GetMapping("/source/findSourceById/{id}") + public Result findSourceById(@PathVariable("id") Long id); } diff --git a/cloud-etl-remote/src/main/java/com/muyu/remote/factory/SourceFactory.java b/cloud-etl-remote/src/main/java/com/muyu/remote/factory/SourceFactory.java index aa9ea1f..0d9eb38 100644 --- a/cloud-etl-remote/src/main/java/com/muyu/remote/factory/SourceFactory.java +++ b/cloud-etl-remote/src/main/java/com/muyu/remote/factory/SourceFactory.java @@ -12,7 +12,7 @@ public class SourceFactory implements FallbackFactory { public SourceRemote create(Throwable cause) { return new SourceRemote() { @Override - public Result queryById(Long id) { + public Result findSourceById(Long id) { return null; } }; diff --git a/cloud-etl-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-etl-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..7f2138a --- /dev/null +++ b/cloud-etl-remote/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,2 @@ +com.muyu.remote.SourceRemote +com.muyu.remote.factory.SourceFactory diff --git a/cloud-etl-server/pom.xml b/cloud-etl-server/pom.xml index bc28f79..85f3752 100644 --- a/cloud-etl-server/pom.xml +++ b/cloud-etl-server/pom.xml @@ -48,10 +48,10 @@ 2.9.0 - - com.muyu - cloud-common-core - + + + + com.muyu @@ -59,6 +59,13 @@ 1.0.0 + + com.muyu + cloud-common-etl + 1.0.0 + compile + + @@ -91,10 +98,10 @@ - - com.muyu - cloud-common-datasource - + + + + @@ -130,9 +137,24 @@ com.muyu cloud-common-datasource + 3.6.3 - + + com.muyu + cloud-etl-client + 1.0.0 + compile + + + + + + + + + + diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/controller/SourceController.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/controller/SourceController.java index 3addaea..7b516dd 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/controller/SourceController.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/controller/SourceController.java @@ -51,6 +51,18 @@ public class SourceController extends BaseController { } + /** + * 获取远程调用基本信息 + */ + @GetMapping("/findSourceById/{id}") + public Result findSourceById(@PathVariable("id") Long id) { + Source source= sourceService.getById(id); + return success(source); + } + + + + /** * 新增基础信息 */ diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/DataValueServiceImpl.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/DataValueServiceImpl.java index 7111ca4..66082c9 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/DataValueServiceImpl.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/DataValueServiceImpl.java @@ -1,8 +1,6 @@ package com.muyu.cloud.etl.service.impl; import com.muyu.Hikari.HikariPool; -import com.muyu.cloud.etl.mysql.MySqlDataSource; -import com.muyu.cloud.etl.mysql.MySqlQuery; import com.muyu.cloud.etl.service.DataValueService; import com.muyu.cloud.etl.service.SourceService; import com.muyu.cloud.etl.service.TableInfoService; @@ -11,6 +9,8 @@ import com.muyu.domain.DataValue; import com.muyu.domain.Source; import com.muyu.domain.TableInfo; import com.muyu.domain.enums.DataType; +import com.muyu.mysql.MySqlDataSource; +import com.muyu.mysql.MySqlQuery; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; @@ -318,4 +318,5 @@ public class DataValueServiceImpl implements DataValueService { return rows; } + } diff --git a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java index 0502fc5..49bc0fb 100644 --- a/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java +++ b/cloud-etl-server/src/main/java/com/muyu/cloud/etl/service/impl/ProductServiceImpl.java @@ -496,14 +496,16 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { hikariConfig.setPassword(dataSources.getPassword()); hikariConfig.setMinimumIdle(2); hikariConfig.setMaximumPoolSize(10); + hikariConfig.setMaxLifetime(300000); // 5 minutes + hikariConfig.setConnectionTimeout(30000); // 30 seconds HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig); // HikariDataSource hikariDataSource = HikariPool.getHikariDataSource(dataSources); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newFixedThreadPool(6); AtomicInteger addCount = new AtomicInteger(); // 分割数据为较小的批次 - List batches = splitData(listList, 3000); + List batches = splitData(listList, 5000); try (Connection conn = hikariDataSource.getConnection()) { conn.setAutoCommit(false); // 开启事务 @@ -537,7 +539,11 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { } executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.HOURS); + if (!executorService.awaitTermination(1, TimeUnit.HOURS)) { + log.warn("Executor service did not terminate within the timeout."); + executorService.shutdownNow(); + } +// executorService.awaitTermination(1, TimeUnit.HOURS); conn.commit(); // 提交事务 } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -554,7 +560,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { private String buildBatchInsertSQL(String tableName, DataValue[][] batch) { StringBuilder columns = new StringBuilder("("); StringBuilder values = new StringBuilder("VALUES "); - // 构建字段名 for (DataValue dataValue : batch[0]) { String key = dataValue.getKey(); @@ -562,7 +567,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { } // 删除最后一个逗号和空格 columns.delete(columns.length() - 2, columns.length()); - // 构建值部分 for (DataValue[] dataValueList : batch) { values.append("("); @@ -575,7 +579,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { } // 删除最后一个逗号 values.delete(values.length() - 2, values.length()); - // 完成 SQL 插入语句 String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString(); return sql; @@ -586,7 +589,6 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { // 根据业务需求处理 null 值 return "NULL"; // 或者其他默认值 } - if (type == DataType.VARCHAR || type == DataType.TEXT) { return "'" + value.toString().replace("'", "''") + "'"; } else if (type == DataType.BIGINT) { @@ -617,10 +619,8 @@ public int addProduct(Long basicId, Long tableId, DataValue[][] listList) { DataValue[][] batch = Arrays.copyOfRange(listList, start, end); batches.add(batch); } - return batches; } - // 关闭数据源 private void close(HikariDataSource dataSource) { dataSource.close(); diff --git a/cloud-etl-server/src/main/resources/bootstrap.yml b/cloud-etl-server/src/main/resources/bootstrap.yml index d1ba0aa..087ed79 100644 --- a/cloud-etl-server/src/main/resources/bootstrap.yml +++ b/cloud-etl-server/src/main/resources/bootstrap.yml @@ -10,6 +10,8 @@ nacos: namespace: cloud-2112 # Spring spring: + main: + allow-bean-definition-overriding: true application: # 应用名称