diff --git a/cloud-task-common/pom.xml b/cloud-task-common/pom.xml index 09a231e..72d0641 100644 --- a/cloud-task-common/pom.xml +++ b/cloud-task-common/pom.xml @@ -29,5 +29,10 @@ com.muyu cloud-rule-common + + com.muyu + cloud-common-etl + 1.0.0 + diff --git a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/BasicTask.java b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/BasicTask.java index c2775b5..e6d8986 100644 --- a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/BasicTask.java +++ b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/BasicTask.java @@ -1,7 +1,7 @@ package com.muyu.common.domian.basic; -import com.muyu.common.domain.DataValue; -import com.muyu.common.domian.basic.abstracts.DataTaskHandler; + + public interface BasicTask { void set(T dataValue); diff --git a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandler.java b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandler.java index 6356d07..e73502a 100644 --- a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandler.java +++ b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandler.java @@ -1,6 +1,6 @@ package com.muyu.common.domian.basic.abstracts; -import com.muyu.common.domain.DataValue; + public class DataTaskHandler { private static final ThreadLocal threadLocal = new ThreadLocal<>(); diff --git a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandlerRowHandler.java b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandlerRowHandler.java index de3466e..4dad8a5 100644 --- a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandlerRowHandler.java +++ b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskHandlerRowHandler.java @@ -1,6 +1,7 @@ package com.muyu.common.domian.basic.abstracts; -import com.muyu.common.domain.DataValue; + +import com.muyu.etl.domain.DataValue; public class DataTaskHandlerRowHandler { public DataTaskHandlerRowHandler() { diff --git a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskValueHandler.java b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskValueHandler.java index a28bf4b..fe5201a 100644 --- a/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskValueHandler.java +++ b/cloud-task-common/src/main/java/com/muyu/common/domian/basic/abstracts/DataTaskValueHandler.java @@ -1,6 +1,7 @@ package com.muyu.common.domian.basic.abstracts; -import com.muyu.common.domain.DataValue; + +import com.muyu.etl.domain.DataValue; public class DataTaskValueHandler { public DataTaskValueHandler() { diff --git a/cloud-task-remote/pom.xml b/cloud-task-remote/pom.xml index 9ae85dd..7d63d3e 100644 --- a/cloud-task-remote/pom.xml +++ b/cloud-task-remote/pom.xml @@ -25,6 +25,12 @@ com.muyu cloud-rule-common + + com.muyu + cloud-common-etl + 1.0.0 + compile + diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java index 7b3ef8c..2bc7deb 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/DatasourceFeign.java @@ -1,7 +1,8 @@ package com.muyu.remote.feign; -import com.muyu.common.domain.DataValue; + +import com.muyu.etl.domain.DataValue; import com.muyu.remote.feign.Factory.DatasourceFeignFactory; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.FeignClient; diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java index 5444e53..394d386 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/DatasourceFeignFactory.java @@ -1,7 +1,7 @@ package com.muyu.remote.feign.Factory; import com.muyu.common.core.domain.Result; -import com.muyu.common.domain.DataValue; +import com.muyu.etl.domain.DataValue; import com.muyu.remote.feign.DatasourceFeign; import lombok.extern.log4j.Log4j2; import org.springframework.cloud.openfeign.FallbackFactory; diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java index 13e6043..a403e96 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/Factory/RuleFactory.java @@ -1,7 +1,7 @@ package com.muyu.remote.feign.Factory; import com.muyu.common.core.domain.Result; -import com.muyu.common.domain.DataValue; +import com.muyu.etl.domain.DataValue; import com.muyu.remote.feign.DatasourceFeign; import com.muyu.remote.feign.RuleFeign; import com.muyu.rule.common.domain.RuleEngineVersion; diff --git a/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java b/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java index d354ab0..518cd3a 100644 --- a/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java +++ b/cloud-task-remote/src/main/java/com/muyu/remote/feign/RuleFeign.java @@ -1,7 +1,8 @@ package com.muyu.remote.feign; import com.muyu.common.core.domain.Result; -import com.muyu.common.domain.DataValue; + +import com.muyu.etl.domain.DataValue; import com.muyu.remote.feign.Factory.RuleFactory; import com.muyu.rule.common.domain.RuleEngineVersion; import io.swagger.v3.oas.annotations.Operation; diff --git a/cloud-task-server/pom.xml b/cloud-task-server/pom.xml index 09decef..86cc169 100644 --- a/cloud-task-server/pom.xml +++ b/cloud-task-server/pom.xml @@ -100,7 +100,11 @@ com.muyu cloud-datasources-common - + + com.muyu + cloud-datasources-client + 1.0.0 + diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java index 7977651..d3deb02 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInfoServiceImpl.java @@ -4,14 +4,19 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.muyu.client.mysql.MySqlDataSource; +import com.muyu.client.mysql.MySqlQuery; import com.muyu.common.core.domain.Result; import com.muyu.common.core.utils.StringUtils; -import com.muyu.common.domain.DataValue; + +import com.muyu.common.data.base.BaseQuery; import com.muyu.common.domian.*; import com.muyu.common.domian.basic.abstracts.DataTaskAbstracts; import com.muyu.common.domian.enums.Weight; import com.muyu.common.domian.req.TaskInfoListReq; import com.muyu.common.domian.resp.TaskInfoResp; +import com.muyu.common.pool.MysqlPool; +import com.muyu.etl.domain.DataValue; import com.muyu.remote.feign.DatasourceFeign; import com.muyu.remote.feign.RuleFeign; import com.muyu.rule.common.domain.RuleEngineVersion; @@ -324,7 +329,8 @@ public class TaskInfoServiceImpl extends ServiceImpl i return joint; } - + @Resource + private MySqlDataSource mySqlDataSource; @NotNull private void getString(Long pageNum, String fieName, @@ -338,12 +344,13 @@ public class TaskInfoServiceImpl extends ServiceImpl i Long taskId) { String sqlSelect = " SELECT " + fieName + " FROM " + joint + " LIMIT " + PAGE_SIZE + " OFFSET " + pageNum; log.info(sqlSelect); - + mySqlDataSource.setQuery(MySqlQuery.builder().dataSourceId(basicId).sql(sqlSelect).one(one).two(two).build()); + DataValue[][] rows = mySqlDataSource.getRows(); //log.info("执行{}查询的方法",sqlSelect); - Result tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect, one, two); - log.info(tableValueResult); - - DataValue[][] data = tableValueResult.getData(); +// Result tableValueResult = datasourceFeign.findTableValueToArray(basicId, sqlSelect, one, two); + //log.info(tableValueResult); +// +// DataValue[][] data = tableValueResult.getData(); log.info("执行{}查询的方法结束", sqlSelect); // for (DataValue[] datum : data) { // for (DataValue dataValue : datum) { @@ -354,7 +361,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i // } // Result result = datasourceFeign.addProduct(newBasicId, tableId, data); // log.info("{}添加结束", result); - executeTheRule(data, map, newBasicId, tableId, taskId); + executeTheRule(rows, map, newBasicId, tableId, taskId); } diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInputServiceImpl.java b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInputServiceImpl.java index b3795cd..cccac76 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInputServiceImpl.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/service/impl/TaskInputServiceImpl.java @@ -1,13 +1,9 @@ package com.muyu.task.server.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.muyu.common.core.domain.Result; import com.muyu.common.core.utils.StringUtils; -import com.muyu.common.domain.DataValue; -import com.muyu.common.domian.NodeJoint; import com.muyu.common.domian.TaskInput; import com.muyu.common.domian.req.TaskInputListReq; import com.muyu.remote.feign.DatasourceFeign; @@ -17,10 +13,6 @@ import com.muyu.task.server.service.NodeTableService; import com.muyu.task.server.service.TaskInputService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; - -import java.util.HashSet; -import java.util.List; @Service public class TaskInputServiceImpl extends ServiceImpl implements TaskInputService { diff --git a/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java index 230269e..4303f29 100644 --- a/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java +++ b/cloud-task-server/src/main/java/com/muyu/task/server/thread/OptimizedPrioritizedThreadPool.java @@ -37,7 +37,7 @@ public class OptimizedPrioritizedThreadPool { // 创建固定大小的线程池 executor = new ThreadPoolExecutor( totalThreads, totalThreads, - 80L, TimeUnit.SECONDS, + 180L, TimeUnit.SECONDS, new LinkedBlockingQueue() ); highPrioritySemaphore = new Semaphore(defaultHighThreads);