diff --git a/muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/web/domain/BaseEntity.java b/muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/web/domain/BaseEntity.java index add8e3e..8afd8d1 100644 --- a/muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/web/domain/BaseEntity.java +++ b/muyu-common/muyu-common-core/src/main/java/com/muyu/common/core/web/domain/BaseEntity.java @@ -1,5 +1,4 @@ package com.muyu.common.core.web.domain; - import com.baomidou.mybatisplus.annotation.TableField; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -8,7 +7,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; - import java.io.Serializable; import java.util.Date; import java.util.HashMap; @@ -25,41 +23,34 @@ import java.util.Map; @AllArgsConstructor public class BaseEntity implements Serializable { private static final long serialVersionUID = 1L; - /** * 搜索值 */ @JsonIgnore @TableField(exist = false) private String searchValue; - /** * 创建者 */ private String createBy; - /** * 创建时间 */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; - /** * 更新者 */ private String updateBy; - /** * 更新时间 */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date updateTime; - /** * 备注 */ private String remark; - /** * 请求参数 */ diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/A.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/A.java new file mode 100644 index 0000000..1238d9a --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/A.java @@ -0,0 +1,22 @@ +package com.zx.text; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 14:59 + */ +public class A implements Base{ + @Override + public void execution(){ + DataModel dataModel = ThreadConstant.get(); + if (dataModel!=null){ + System.out.println(dataModel.getValue()); + }else { + System.out.println("null"); + } + + } + + +} diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/B.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/B.java new file mode 100644 index 0000000..3ef1628 --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/B.java @@ -0,0 +1,22 @@ +package com.zx.text; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 15:02 + */ +public class B implements Base{ + + @Override + public void execution() { + DataModel dataModel = ThreadConstant.get(); + if (dataModel != null) { + dataModel.setValue("李四"); + } else { + System.out.println("dataModel is null"); + } + } + + +} diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/Base.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/Base.java new file mode 100644 index 0000000..00a0661 --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/Base.java @@ -0,0 +1,11 @@ +package com.zx.text; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 15:16 + */ +public interface Base { + public void execution(); +} diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/C.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/C.java new file mode 100644 index 0000000..b124212 --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/C.java @@ -0,0 +1,27 @@ +package com.zx.text; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 15:03 + */ +public class C implements Base{ + + @Override + public void execution(){ + DataModel dataModel = ThreadConstant.get(); + if (dataModel != null){ + if ("张三".equals(dataModel.getKey())){ + System.out.println("是张三"); + }else { + System.out.println("不是张三而是 :"+dataModel.getValue()); + } + }else { + System.out.println("dataModel为空"); + } + + } + + +} diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/DataModel.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/DataModel.java new file mode 100644 index 0000000..875b9ff --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/DataModel.java @@ -0,0 +1,67 @@ +package com.zx.text; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 14:56 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class DataModel { + + private String key; + + private String value; + + private String sourceType; + + private String processType; + + private Class processClass; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getSourceType() { + return sourceType; + } + + public void setSourceType(String sourceType) { + this.sourceType = sourceType; + } + + public String getProcessType() { + return processType; + } + + public void setProcessType(String processType) { + this.processType = processType; + } + + public Class getProcessClass() { + return processClass; + } + + public void setProcessClass(Class processClass) { + this.processClass = processClass; + } +} diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/Main.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/Main.java new file mode 100644 index 0000000..b4a66b3 --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/Main.java @@ -0,0 +1,45 @@ +package com.zx.text; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 14:55 + */ +public class Main { + + private static final ConcurrentHashMap ruleEngineMap = new ConcurrentHashMap<>(); + + static { + init(); + } + + public static void init(){ + ruleEngineMap.put("a",new A()); + ruleEngineMap.put("b",new B()); + ruleEngineMap.put("c",new C()); + } + + public static void main(String[] args) { + DataModel dataModel = new DataModel(); + dataModel.setValue("顽固"); + dataModel.setKey("name"); + dataModel.setProcessClass(String.class); + dataModel.setSourceType("varchar"); + dataModel.setProcessType("String"); + ThreadConstant.set(dataModel); + Base a = ruleEngineMap.get("a"); + Base b = ruleEngineMap.get("b"); + Base c = ruleEngineMap.get("c"); + System.out.println(a); + System.out.println(b); + System.out.println(c); + a.execution(); + b.execution(); + c.execution(); + ThreadConstant.remove(); + } + +} diff --git a/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/ThreadConstant.java b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/ThreadConstant.java new file mode 100644 index 0000000..8d8b98e --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-etl-service/src/main/java/com/zx/text/ThreadConstant.java @@ -0,0 +1,25 @@ +package com.zx.text; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 14:57 + */ +public class ThreadConstant { + + private static ThreadLocal threadLocal =new ThreadLocal<>(); + + public static DataModel get(){ + return threadLocal.get(); + } + + public static void set(DataModel dm){ + threadLocal.set(dm); + } + + public static void remove(){ + threadLocal.remove(); + } + +} diff --git a/muyu-modules/muyu-etl/muyu-remote/pom.xml b/muyu-modules/muyu-etl/muyu-remote/pom.xml index 3c04107..0665a77 100644 --- a/muyu-modules/muyu-etl/muyu-remote/pom.xml +++ b/muyu-modules/muyu-etl/muyu-remote/pom.xml @@ -19,4 +19,18 @@ UTF-8 + + + com.zx + muyu-etl-common + 3.6.3 + + + + com.muyu + muyu-common-core + + + + diff --git a/muyu-modules/muyu-etl/muyu-remote/src/main/java/com/zx/RemoteAssetModelService.java b/muyu-modules/muyu-etl/muyu-remote/src/main/java/com/zx/RemoteAssetModelService.java new file mode 100644 index 0000000..a06d9de --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-remote/src/main/java/com/zx/RemoteAssetModelService.java @@ -0,0 +1,27 @@ +package com.zx; + +import com.muyu.common.core.constant.ServiceNameConstants; +import com.muyu.common.core.domain.Result; +import com.muyu.common.core.web.page.TableDataInfo; +import com.muyu.common.system.remote.factory.RemoteUserFallbackFactory; +import com.zx.domain.AssetModel; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 14:08 + */ +@FeignClient(contextId = "remoteAssetModelService", + value = ServiceNameConstants.SYSTEM_SERVICE, + fallbackFactory = RemoteUserFallbackFactory.class) +public interface RemoteAssetModelService { + + @PostMapping("/model/list") + public Result> list(@RequestBody AssetModel assetModel); + + +} diff --git a/muyu-modules/muyu-etl/muyu-remote/src/main/java/com/zx/factory/AssetModelFallbackFactory.java b/muyu-modules/muyu-etl/muyu-remote/src/main/java/com/zx/factory/AssetModelFallbackFactory.java new file mode 100644 index 0000000..4a25f73 --- /dev/null +++ b/muyu-modules/muyu-etl/muyu-remote/src/main/java/com/zx/factory/AssetModelFallbackFactory.java @@ -0,0 +1,25 @@ +package com.zx.factory; + +import com.muyu.common.core.domain.Result; +import com.muyu.common.core.web.page.TableDataInfo; +import com.zx.RemoteAssetModelService; +import com.zx.domain.AssetModel; +import org.springframework.cloud.openfeign.FallbackFactory; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 14:14 + */ +public class AssetModelFallbackFactory implements FallbackFactory { + @Override + public RemoteAssetModelService create(Throwable cause) { + return new RemoteAssetModelService() { + @Override + public Result> list(AssetModel assetModel) { + return Result.error("获取数据模型列表失败"); + } + }; + } +} diff --git a/muyu-modules/muyu-etl/pom.xml b/muyu-modules/muyu-etl/pom.xml index 6d8720a..2d558c3 100644 --- a/muyu-modules/muyu-etl/pom.xml +++ b/muyu-modules/muyu-etl/pom.xml @@ -23,5 +23,87 @@ UTF-8 + + + com.muyu + muyu-modules-system + 3.6.3 + + + + org.postgresql + postgresql + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + io.springfox + springfox-swagger-ui + ${swagger.fox.version} + + + + + com.mysql + mysql-connector-j + + + + + com.muyu + muyu-common-datasource + + + + + com.muyu + muyu-common-datascope + + + + + com.muyu + muyu-common-log + + + + + com.muyu + muyu-common-swagger + + + + + org.springframework.cloud + spring-cloud-starter-openfeign + + + + + diff --git a/muyu-modules/muyu-rule-engine/muyu-rule-engine-client/pom.xml b/muyu-modules/muyu-rule-engine/muyu-rule-engine-client/pom.xml index 647c024..ebe9c19 100644 --- a/muyu-modules/muyu-rule-engine/muyu-rule-engine-client/pom.xml +++ b/muyu-modules/muyu-rule-engine/muyu-rule-engine-client/pom.xml @@ -19,7 +19,24 @@ + + org.apache.commons + commons-pool2 + 2.11.0 + + + + + com.zx + muyu-etl-common + 3.6.3 + + + rule.engine + muyu-rule-engine-common + 3.6.3 + mysql mysql-connector-java diff --git a/muyu-modules/muyu-rule-engine/muyu-rule-engine-client/src/main/java/rule/engine/domain/ConnectionPoolImp.java b/muyu-modules/muyu-rule-engine/muyu-rule-engine-client/src/main/java/rule/engine/domain/ConnectionPoolImp.java new file mode 100644 index 0000000..2bd6373 --- /dev/null +++ b/muyu-modules/muyu-rule-engine/muyu-rule-engine-client/src/main/java/rule/engine/domain/ConnectionPoolImp.java @@ -0,0 +1,230 @@ +package rule.engine.domain; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/9 21:30 + */ +public class ConnectionPoolImp implements ConnectionPool{ + //连接计数器,这里使用原子类保证线程安全 + private final AtomicInteger connectionCount = new AtomicInteger(0); + //连接池配置 + private final DataSourceConfig config; + //空闲连接队列,用于存储空闲连接 + ///支持在两端插入和移除元素的线性集合。 deque 这个名字是“双头队列”的缩写,通常发音为“deck”。 + private final Deque idleConnectionsPool = new ArrayDeque<>(); + //使用中连接列表,存储正在使用中的连接 + private final List activeConnectionsPool = new ArrayList<>(); + //定时任务对象,执行健康检查的任务 + //线程用于在后台线程中计划将来执行的任务的工具。可以将任务安排为一次性执行,也可以定期重复执行。 + private final Timer timer; + //全局锁对象 + private static final Object lock = new Object(); + + public ConnectionPoolImp(DataSourceConfig config) throws SQLException, ClassNotFoundException { + this.config = config; + //将驱动类加载入jvm + Class.forName(config.getDriver()); + //循环添加连接进空闲队列 + for (int i = 0; i < Integer.parseInt( config.getInitSize()); i++) { + this.idleConnectionsPool.addLast( + new ConnectionProxy(DriverManager.getConnection( + this.config.getUrl(), + this.config.getUsername(), + this.config.getPassword())) + ); + //更新计数器,值+1 + this.connectionCount.incrementAndGet(); + } + + //创建Timer实例 + this.timer = new Timer(); + //编写定时任务的逻辑 + timer.schedule(new TimerTask() { + public void run() { + System.out.println("[" + Thread.currentThread().getName() + "]" + " - Connection health check"); + try { + //遍历空闲连接队列,检查连接是否可用 + for (ConnectionProxy c : idleConnectionsPool) { + //调用Connection对象的isValid方法检查连接是否可用 + if (!c.getConnection().isValid(0)) { + //如果连接不可用,则从队列中移除,并更新计数器的值 + idleConnectionsPool.remove(c); + connectionCount.decrementAndGet(); + } + } + } + catch (SQLException e) { + e.printStackTrace(); + } + System.out.println("[" + Thread.currentThread().getName() + "]" + " - Connection health check end"); + System.out.println("[" + Thread.currentThread().getName() + "]" + " - Connection health check end"); + System.out.println("[" + Thread.currentThread().getName() + "]" + " - Connection health check end"); + System.out.println("[" + Thread.currentThread().getName() + "]" + " - Connection health check end"); + System.out.println("[" + Thread.currentThread().getName() + "]" + " - Connection health check end"); + //加锁,保证线程安全 + synchronized (lock) { + //因为要在迭代的过程中移除列表中的元素,所以这里倒着枚举元素,防止出问题 + for (int i = activeConnectionsPool.size() - 1; i >= 0; i--) { + ConnectionProxy c = activeConnectionsPool.get(i); + //获取获取连接时的时间戳,也就是连接加入使用中列表时的时间戳 + long connectTime = c.getConnectionTime(); + //获取当前时间戳 + long currentTime = System.currentTimeMillis(); + //根据配置文件中的超时时间判断是否超时 + if (currentTime - connectTime > Long.parseLong(config.getTimeout())) { + //如果超时了就从列表中移除,然会计数器的值-1 + activeConnectionsPool.remove(i); + connectionCount.decrementAndGet(); + try { + //移除连接后切记要将该连接关闭 + c.getConnection().close(); + System.out.println("[" + Thread.currentThread().getName() + "]" + + " - A connection timed out and has been closed."); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + } + } + //指定定时任务在多长时间后开始以及检查的间隔时间 + }, Long.parseLong(this.config.getDelay()), Long.parseLong(this.config.getInterval())); + System.out.println("[" + Thread.currentThread().getName() + "]" + " - Successfully started connection pool."); + System.out.println("The initial number of connections in the connection pool is " + config.getInterval()); + System.out.println("The maximum number of connections is " + this.config.getMaxSize()); + System.out.println("The timeout time is " + this.config.getTimeout() + "ms"); + System.out.println("The connection health check will starts in " + this.config.getDelay() + "ms"); + System.out.println("connection health check interval of " + this.config.getInterval() + "ms"); + } + + @Override + public ConnectionProxy getConnection() throws SQLException, InterruptedException { + ConnectionProxy connection = null; + //判断connection是否为空,为空需要重复获取 + while (connection == null) { + //加锁 + synchronized (lock) { + //判断空闲队列是否为空 + if (!this.idleConnectionsPool.isEmpty()) { + //若空闲队列不为空则让队头连接出列,并将其加入使用中列表 + connection = this.idleConnectionsPool.removeFirst(); + this.activeConnectionsPool.add(connection); + System.out.println("[" + Thread.currentThread().getName() + "]" + + " - Extract a connection from the idle connection pool."); + } else { + //当空闲队列为空,即没有空闲连接的情况 + + //判断当前连接总数是否已经达到最大值maxSize + if (this.connectionCount.get() < Integer.parseInt(this.config.getMaxSize())) { + //若没有达到最大值则直接创建有一个新连接 + + Optional opt = this.createConnection(); + + //判断连接是否创建成功 + if (opt.isPresent()) { + + //若创建成功则直接将连接加入使用中列表,并把连接计数器的值+1 + connection = opt.get(); + this.activeConnectionsPool.add(opt.get()); + this.connectionCount.incrementAndGet(); + System.out.println(Thread.currentThread().getName() + + " - Created a new connection."); + } else { + System.out.println("[" + Thread.currentThread().getName() + "]" + + " - Failed to obtain a new connection, preparing to try again."); + } + } else { + /* + 如果连接数量已到达最大值maxSize, + 这时需要让该线程陷入等待让出cpu时间片, + 等待其它线程释放连接时再将其唤醒 + */ + + System.out.println("[" + Thread.currentThread().getName() + "]" + + " - The connection pool is full and waiting for other threads to release the connection."); + lock.wait(); + } + } + } + } + //将获取到连接时的时间戳写进connectionTime属性里 + connection.setConnectionTime(System.currentTimeMillis()); + return connection; + } + + @Override + public void releaseConnection(ConnectionProxy conn) { + //判断连接是否还可用 + if (!this.isConnectionValid(conn)) { + //如果连接一不可用,则将其从使用中列表中移除,计数器值-1 + synchronized (lock) { + this.activeConnectionsPool.remove(conn); + this.connectionCount.decrementAndGet(); + return; + } + } + + //如果连接可用,则将从使用中列表中移除并加入空闲队列 + synchronized (lock) { + this.idleConnectionsPool.add(conn); + this.activeConnectionsPool.remove(conn); + System.out.println("[" + Thread.currentThread().getName()); + //唤醒其它正在等待的线程 + lock.notifyAll(); + } + } + + @Override + public void shutdown() throws SQLException { + + //循环遍历使用中列表和空闲队列,关闭存放的连接 + System.out.println(Thread.currentThread().getName()); + for (ConnectionProxy conn : this.activeConnectionsPool) { + conn.getConnection().close(); + } + for (ConnectionProxy conn : this.idleConnectionsPool) { + conn.getConnection().close(); + } + //将健康检查的定时任务取消``````````````````````````````````````````````````` + this.timer.cancel(); + } + + + private boolean isConnectionValid(ConnectionProxy conn) { + try { + return conn != null && !conn.getConnection().isClosed(); + } catch (SQLException e) { + e.printStackTrace(); + } + return false; + } + + private Optional createConnection() throws SQLException { + + ConnectionProxy conn = null; + try { + //创建数据库连接的代理对象 + conn = new ConnectionProxy( + DriverManager. + getConnection( + this.config.getUrl(), + this.config.getUsername(), + this.config.getPassword()) + ); + } catch (Exception e) { + throw new SQLException("create connection failed", e); + } + return Optional.of(conn); + } +} + + + + diff --git a/muyu-modules/muyu-rule-engine/muyu-rule-engine-common/pom.xml b/muyu-modules/muyu-rule-engine/muyu-rule-engine-common/pom.xml index a73161f..e0ad2bc 100644 --- a/muyu-modules/muyu-rule-engine/muyu-rule-engine-common/pom.xml +++ b/muyu-modules/muyu-rule-engine/muyu-rule-engine-common/pom.xml @@ -17,9 +17,12 @@ 17 UTF-8 - - - + + + com.zx + muyu-etl-common + 3.6.3 + com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery diff --git a/muyu-modules/muyu-rule-engine/muyu-rule-engine-common/src/main/java/rule/engine/domain/req/DataSourceAssetModelReq.java b/muyu-modules/muyu-rule-engine/muyu-rule-engine-common/src/main/java/rule/engine/domain/req/DataSourceAssetModelReq.java new file mode 100644 index 0000000..ce6a4eb --- /dev/null +++ b/muyu-modules/muyu-rule-engine/muyu-rule-engine-common/src/main/java/rule/engine/domain/req/DataSourceAssetModelReq.java @@ -0,0 +1,24 @@ +package rule.engine.domain.req; + +import com.zx.domain.DataAsset; +import com.zx.domain.DataSource; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ClassDescription: + * @JdkVersion: 17 + * @Author: zhangxu + * @Created: 2024/5/13 14:27 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class DataSourceAssetModelReq { + + private DataSource dataSource; + + private DataAsset dataAsset; + +} diff --git a/muyu-modules/muyu-system/src/main/java/com/muyu/system/service/impl/SysDeptServiceImpl.java b/muyu-modules/muyu-system/src/main/java/com/muyu/system/service/impl/SysDeptServiceImpl.java index a55301b..def06a7 100644 --- a/muyu-modules/muyu-system/src/main/java/com/muyu/system/service/impl/SysDeptServiceImpl.java +++ b/muyu-modules/muyu-system/src/main/java/com/muyu/system/service/impl/SysDeptServiceImpl.java @@ -86,6 +86,7 @@ public class SysDeptServiceImpl extends ServiceImpl impl return returnList; } + /** * 构建前端所需要下拉树结构 *