feat:新增mq更新引擎版本内容和数据源

dev
gtl 2024-05-13 16:24:09 +08:00
parent dc7bb6a28f
commit 7e8eb968c0
44 changed files with 912 additions and 41 deletions

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -31,5 +31,7 @@
<version>1.2.20</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,13 @@
package com.ruoyi.dataAsset.constant;
/**
*
* @ClassName QueueNameConstants
* @Author
*/
public class QueueNameConstants {
//发送短消息队列名称
public static final String DATASOURCE_MESSAGE = "send_datasource_message";
}

View File

@ -99,6 +99,13 @@
<artifactId>mssql-jdbc</artifactId>
<version>9.4.0.jre8</version>
</dependency>
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,51 @@
package com.ruoyi.dataAsset.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* broker
* @ClassName: ConfirmFallbackConfig
* @Author:
* @Date: 2024/1/15
*/
@Log4j2
@Component
public class ConfirmFallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* ConfirmFallbackConfig rabbitTemplate
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
* broker
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息发送成功
log.info("消息发送成功");
} else {
// 消息发送是吧
log.info("消息发送失败,错误原因:{}", cause);
// 获取到发送失败消息的内容
}
}
}

View File

@ -0,0 +1,53 @@
package com.ruoyi.dataAsset.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdminRabbitMQJavaRabbitMQRabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitMQ
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,16 @@
package com.ruoyi.dataAsset.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,44 @@
package com.ruoyi.dataAsset.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
* @ClassName: ConfirmFallbackConfig
* @Author:
* @Date: 2024/1/15
*/
@Log4j2
@Component
public class ReturnsFallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* rabbitTemplate 使 ReturnsFallbackConfig
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息内容:{},被交换机:{}退回,退回的原因:{}",
returned.getMessage().toString(), returned.getExchange(), returned.getReplyText());
// TODO 将发送到队列失败的消息 存入到 mysql | redis 后续可以进行补救 或者 先关处理
}
}

View File

@ -0,0 +1,26 @@
package com.ruoyi.dataAsset.queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @ClassName NormalQueue
* @Description
* @Author
* @Date 2024/1/16 18:38
*/
@Component
public class NormalQueue {
@Autowired
private RabbitTemplate rabbitTemplate;
public <T> void sendUUIDMsg(String queueName,T msg){
rabbitTemplate.convertAndSend(queueName,msg,message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
}
}

View File

@ -11,9 +11,11 @@ import com.ruoyi.common.core.utils.ObjUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.dataAsset.config.DruidDataSourceFactory;
import com.ruoyi.dataAsset.constant.QueueNameConstants;
import com.ruoyi.dataAsset.domain.AssetModelData;
import com.ruoyi.dataAsset.domain.ColumnInfo;
import com.ruoyi.dataAsset.domain.TableInfo;
import com.ruoyi.dataAsset.queue.NormalQueue;
import com.ruoyi.dataAsset.service.AssetModelDataService;
import com.ruoyi.dataAsset.service.ColumnInfoService;
import com.ruoyi.dataAsset.service.TableInfoService;
@ -50,6 +52,9 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
@Autowired
private DruidDataSourceFactory druidDataSourceFactory;
@Autowired
private NormalQueue normalQueue;
/**
*
*
@ -108,6 +113,10 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
throw new RuntimeException(e);
}
}
//异步更新数据源
CompletableFuture.runAsync(()->{
this.renewalDataSource(dataSource);
});
//如果为不可用修改为可用
if(dataSource.getStatus().equals("N")){
dataSource.setStatus("Y");
@ -308,4 +317,28 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
throw new RuntimeException(e);
}
}
/**
*
* @param dataSource
*/
public void renewalDataSource(DataSource dataSource){
// 发送消息
normalQueue.sendUUIDMsg(QueueNameConstants.DATASOURCE_MESSAGE,dataSource);
DruidDataSource druidDataSource = DataSourceUtil.createDataSource(dataSource);
// 新增或替换数据源
Long id = dataSource.getId();
if(druidDataSourceFactory.getMap().containsKey(id)){
// 获取旧的
DruidDataSource source = druidDataSourceFactory.get(id);
// 替换
druidDataSourceFactory.put(id,druidDataSource);
// 关闭旧的
source.close();
}else {
//新增
druidDataSourceFactory.put(id, druidDataSource);
}
}
}

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -24,6 +24,13 @@
<artifactId>ruoyi-data_transform-common</artifactId>
<version>3.6.3</version>
</dependency>
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,51 @@
package com.ruoyi.dataTransform.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* broker
* @ClassName: ConfirmFallbackConfig
* @Author:
* @Date: 2024/1/15
*/
@Log4j2
@Component
public class ConfirmFallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* ConfirmFallbackConfig rabbitTemplate
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
* broker
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息发送成功
log.info("消息发送成功");
} else {
// 消息发送是吧
log.info("消息发送失败,错误原因:{}", cause);
// 获取到发送失败消息的内容
}
}
}

View File

@ -0,0 +1,53 @@
package com.ruoyi.dataTransform.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdminRabbitMQJavaRabbitMQRabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitMQ
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,16 @@
package com.ruoyi.dataTransform.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,44 @@
package com.ruoyi.dataTransform.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
* @ClassName: ConfirmFallbackConfig
* @Author:
* @Date: 2024/1/15
*/
@Log4j2
@Component
public class ReturnsFallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* rabbitTemplate 使 ReturnsFallbackConfig
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息内容:{},被交换机:{}退回,退回的原因:{}",
returned.getMessage().toString(), returned.getExchange(), returned.getReplyText());
// TODO 将发送到队列失败的消息 存入到 mysql | redis 后续可以进行补救 或者 先关处理
}
}

View File

@ -0,0 +1,84 @@
package com.ruoyi.dataTransform.consumer;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.dataAsset.config.DruidDataSourceFactory;
import com.ruoyi.dataAsset.constant.QueueNameConstants;
import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.util.DataSourceUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
*
* @ClassName: DataSourceConsumer
* @Author:
* @Date: 2024/5/13
*/
@Log4j2
@Component
public class DataSourceConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private DruidDataSourceFactory druidDataSourceFactory;
@Autowired
private MessageConverter messageConverter;
/**
*
*/
@RabbitListener(queuesToDeclare = {@Queue(name = QueueNameConstants.DATASOURCE_MESSAGE)})
public void smsConsumer(Message message, Channel channel) {
DataSource dataSource = (DataSource) messageConverter.fromMessage(message);
log.info("数据源消费者接收到消息,消息内容:{}", JSON.toJSONString(dataSource));
// 获取消息的id
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(QueueNameConstants.DATASOURCE_MESSAGE, messageId);
if (count > 0) {
// 正常消费消息
DruidDataSource druidDataSource = DataSourceUtil.createDataSource(dataSource);
// 新增或替换数据源
Long id = dataSource.getId();
if(druidDataSourceFactory.getMap().containsKey(id)){
// 获取旧的
DruidDataSource source = druidDataSourceFactory.get(id);
// 替换
druidDataSourceFactory.put(id,druidDataSource);
// 关闭旧的
source.close();
}else {
//新增
druidDataSourceFactory.put(id, druidDataSource);
}
// 消费成功 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("数据源消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(dataSource));
}
} catch (Exception ex) {
log.error("数据源消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(dataSource), ex.getMessage());
// 删除 redis中 添加的消息的id
redisTemplate.opsForSet().remove(QueueNameConstants.DATASOURCE_MESSAGE, messageId);
// 回退消息
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
log.error("数据源消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(dataSource), e.getMessage());
}
}
}
}

View File

@ -0,0 +1,73 @@
package com.ruoyi.dataTransform.consumer;
import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.ruleEngine.client.config.RuleEngineVersionFactory;
import com.ruoyi.ruleEngine.client.util.RuleEngineUtil;
import com.ruoyi.ruleEngine.constant.QueueNameConstants;
import com.ruoyi.ruleEngine.domain.EngineVersion;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
*
* @ClassName: EngineVersionConsumer
* @Author:
* @Date: 2024/5/13
*/
@Log4j2
@Component
public class EngineVersionConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RuleEngineVersionFactory ruleEngineVersionFactory;
@Autowired
private MessageConverter messageConverter;
/**
*
*/
@RabbitListener(queuesToDeclare = {@Queue(name = QueueNameConstants.VERSION_MESSAGE)})
public void smsConsumer(Message message, Channel channel) {
EngineVersion engineVersion = (EngineVersion) messageConverter.fromMessage(message);
log.info("引擎版本消费者接收到消息,消息内容:{}", JSON.toJSONString(engineVersion));
// 获取消息的id
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(QueueNameConstants.VERSION_MESSAGE, messageId);
if (count > 0) {
// 正常消费消息
// 编译
Map<String, byte[]> bytecode = RuleEngineUtil.compileVersion(engineVersion);
// 新增或替换规则内容
ruleEngineVersionFactory.put(engineVersion.getId(),bytecode);
// 消费成功 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("引擎版本消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(engineVersion));
}
} catch (Exception ex) {
log.error("引擎版本消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(engineVersion), ex.getMessage());
// 删除 redis中 添加的消息的id
redisTemplate.opsForSet().remove(QueueNameConstants.VERSION_MESSAGE, messageId);
// 回退消息
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
log.error("引擎版本消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(engineVersion), e.getMessage());
}
}
}
}

View File

@ -18,6 +18,7 @@ import com.ruoyi.ruleEngine.remote.RemoteRuleEngineService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@ -77,7 +78,8 @@ public class EngineOperationServiceImpl implements EngineOperationService {
// 调用execution方法
Method mainMethod = clazz.getDeclaredMethod("execution");
mainMethod.invoke(clazz.newInstance());
} catch (Exception e) {
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException |
InstantiationException e) {
log.info("测试失败,{}",e.getMessage());
//修改测试状态
remoteRuleEngineService.edit(testDataReq.getVersionId(),"0");
@ -86,11 +88,27 @@ public class EngineOperationServiceImpl implements EngineOperationService {
//修改测试状态
remoteRuleEngineService.edit(testDataReq.getVersionId(),"1");
// 处理后的测试数据
return switch (testDataReq.getScope()) {
case 2 -> DataSetContextHolder.get().getDataSetModel();
case 3 -> RecordContextHolder.get().getRecordModel();
case 4 -> DataModelContextHolder.get().getDataModel();
default -> null;
};
switch (testDataReq.getScope()) {
case 2 -> {
//获取处理完的数据
DataSetModel dataSetModel = DataSetContextHolder.get().getDataSetModel();
//删除线程变量
DataSetContextHolder.remove();
return dataSetModel;
}
case 3 -> {
RecordModel recordModel = RecordContextHolder.get().getRecordModel();
RecordContextHolder.remove();
return recordModel;
}
case 4 -> {
DataModel dataModel = DataModelContextHolder.get().getDataModel();
DataModelContextHolder.remove();
return dataModel;
}
default -> {
return null;
}
}
}
}

View File

@ -4,6 +4,8 @@ server:
# Spring
spring:
main:
allow-bean-definition-overriding: true
application:
# 应用名称
name: ruoyi-data-transform
@ -14,11 +16,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -1,6 +1,7 @@
package com.ruoyi.ruleEngine.client.config;
import com.ruoyi.common.core.domain.Result;
import com.ruoyi.ruleEngine.client.util.RuleEngineUtil;
import com.ruoyi.ruleEngine.constant.RuleOperationConstants;
import com.ruoyi.ruleEngine.domain.EngineVersion;
import com.ruoyi.ruleEngine.domain.req.EngineVersionQueryReq;
@ -33,10 +34,8 @@ public class RuleEngineApplicationRunner implements ApplicationRunner {
Result<List<EngineVersion>> result = remoteRuleEngineService.list(new EngineVersionQueryReq());
if(Result.isSuccess(result)){
result.getData().forEach(engineVersion -> {
// 获取版本内容
String content = engineVersion.getRuleContent().replaceAll("\r\n", "");
// 对source进行编译生成class文件存放在Map中这里用bytecode接收
Map<String, byte[]> bytecode = DynamicLoader.compile(engineVersion.getVersionClass() + RuleOperationConstants.FILE_SUFFIX,content );
// 编译规则内容
Map<String, byte[]> bytecode = RuleEngineUtil.compileVersion(engineVersion);
// 存入工厂
ruleEngineVersionFactory.put(engineVersion.getId(), bytecode);
});

View File

@ -1,7 +1,7 @@
package com.ruoyi.ruleEngine.client.context;
import com.ruoyi.ruleEngine.client.model.process.DataModelProcessModel;
import com.ruoyi.ruleEngine.client.util.ScopeContextHolderUtil;
import com.ruoyi.ruleEngine.client.util.RuleEngineUtil;
import lombok.Data;
import lombok.experimental.SuperBuilder;
@ -35,7 +35,7 @@ public class DataModelContextHolder{
}
public static void set(Connection connection,String sql){
ScopeContextHolderUtil.setDataSetContextHolder(connection,sql);
RuleEngineUtil.setDataSetContextHolder(connection,sql);
}
public static void remove() {

View File

@ -1,10 +1,10 @@
package com.ruoyi.ruleEngine.client.engine.action;
/**
*
* @ClassName ActionDiscard
*
* @ClassName ActionIgnore
* @Author:
* @Date: 2024/5/6 13:48
*/
public class ActionDiscard extends RuntimeException{
public class ActionIgnore extends RuntimeException{
}

View File

@ -0,0 +1,10 @@
package com.ruoyi.ruleEngine.client.engine.action;
/**
*
* @ClassName ActionRecords
* @Author
* @Date 2024/5/12 22:31
*/
public class ActionRecords extends RuntimeException{
}

View File

@ -0,0 +1,10 @@
package com.ruoyi.ruleEngine.client.engine.action;
/**
*
* @ClassName ActionRemove
* @Author:
* @Date: 2024/5/6 13:48
*/
public class ActionRemove extends RuntimeException{
}

View File

@ -1,22 +1,26 @@
package com.ruoyi.ruleEngine.client.util;
import com.ruoyi.ruleEngine.client.context.DataSetContextHolder;
import com.ruoyi.ruleEngine.client.dynamicLoad.DynamicLoader;
import com.ruoyi.ruleEngine.client.model.DataModel;
import com.ruoyi.ruleEngine.client.model.DataSetModel;
import com.ruoyi.ruleEngine.client.model.RecordModel;
import com.ruoyi.ruleEngine.client.model.process.DataSetProcessModel;
import com.ruoyi.ruleEngine.constant.RuleOperationConstants;
import com.ruoyi.ruleEngine.domain.EngineVersion;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*
* @ClassName ScopeContextHolderUtil
*
* @ClassName RuleEngineUtil
* @Author
* @Date 2024/5/6 20:51
*/
public class ScopeContextHolderUtil {
public class RuleEngineUtil {
public static void setDataSetContextHolder(Connection connection,String sql){
List<RecordModel> recordModels=new ArrayList<>();
@ -52,4 +56,10 @@ public class ScopeContextHolderUtil {
DataSetContextHolder.set(dataSetProcessModel);
}
public static Map<String, byte[]> compileVersion(EngineVersion engineVersion){
// 获取版本内容
String content = engineVersion.getRuleContent().replaceAll("\r\n", "");
// 对source进行编译生成class文件存放在Map中这里用bytecode接收
return DynamicLoader.compile(engineVersion.getVersionClass() + RuleOperationConstants.FILE_SUFFIX,content );
}
}

View File

@ -0,0 +1,13 @@
package com.ruoyi.ruleEngine.constant;
/**
*
* @ClassName QueueNameConstants
* @Author
*/
public class QueueNameConstants {
//发送短消息队列名称
public static final String VERSION_MESSAGE = "send_version_message";
}

View File

@ -91,6 +91,13 @@
<artifactId>mssql-jdbc</artifactId>
<version>9.4.0.jre8</version>
</dependency>
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,51 @@
package com.ruoyi.ruleEngine.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* broker
* @ClassName: ConfirmFallbackConfig
* @Author:
* @Date: 2024/1/15
*/
@Log4j2
@Component
public class ConfirmFallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* ConfirmFallbackConfig rabbitTemplate
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setConfirmCallback(this);
}
/**
* broker
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息发送成功
log.info("消息发送成功");
} else {
// 消息发送是吧
log.info("消息发送失败,错误原因:{}", cause);
// 获取到发送失败消息的内容
}
}
}

View File

@ -0,0 +1,53 @@
package com.ruoyi.ruleEngine.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitAdminRabbitMQJavaRabbitMQRabbitMQ
*/
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
/**
* RabbitMQ
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时次配置必须配置否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}

View File

@ -0,0 +1,16 @@
package com.ruoyi.ruleEngine.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}

View File

@ -0,0 +1,44 @@
package com.ruoyi.ruleEngine.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
* @ClassName: ConfirmFallbackConfig
* @Author:
* @Date: 2024/1/15
*/
@Log4j2
@Component
public class ReturnsFallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* rabbitTemplate 使 ReturnsFallbackConfig
*/
@PostConstruct
public void init() {
this.rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息内容:{},被交换机:{}退回,退回的原因:{}",
returned.getMessage().toString(), returned.getExchange(), returned.getReplyText());
// TODO 将发送到队列失败的消息 存入到 mysql | redis 后续可以进行补救 或者 先关处理
}
}

View File

@ -73,7 +73,7 @@ public class EngineVersionController extends BaseController {
@PutMapping("/edit/{id}")
@ApiOperation("修改引擎规则版本")
public Result<String> edit(@PathVariable Long id, @RequestBody EngineVersionEditReq engineConfigEditReq) {
return toAjax(engineVersionService.updateById(EngineVersion.editBuild(id,engineConfigEditReq, SecurityUtils::getUsername)));
return toAjax(engineVersionService.update(EngineVersion.editBuild(id,engineConfigEditReq, SecurityUtils::getUsername)));
}
/**

View File

@ -0,0 +1,26 @@
package com.ruoyi.ruleEngine.queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @ClassName NormalQueue
* @Description
* @Author
* @Date 2024/1/16 18:38
*/
@Component
public class NormalQueue {
@Autowired
private RabbitTemplate rabbitTemplate;
public <T> void sendUUIDMsg(String queueName,T msg){
rabbitTemplate.convertAndSend(queueName,msg,message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
});
}
}

View File

@ -36,4 +36,11 @@ public interface EngineVersionService extends IService<EngineVersion> {
* @return
*/
boolean update(Long id, String testStatus);
/**
*
* @param engineVersion
* @return
*/
boolean update(EngineVersion engineVersion);
}

View File

@ -5,17 +5,20 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.utils.ObjUtils;
import com.ruoyi.ruleEngine.constant.EngineVersionConstants;
import com.ruoyi.ruleEngine.constant.QueueNameConstants;
import com.ruoyi.ruleEngine.domain.EngineVersion;
import com.ruoyi.ruleEngine.domain.req.VersionClassCreateReq;
import com.ruoyi.ruleEngine.domain.resp.EngineConfigScopeResp;
import com.ruoyi.ruleEngine.domain.resp.VersionClassCreateResp;
import com.ruoyi.ruleEngine.mapper.EngineVersionMapper;
import com.ruoyi.ruleEngine.queue.NormalQueue;
import com.ruoyi.ruleEngine.service.EngineConfigService;
import com.ruoyi.ruleEngine.service.EngineVersionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Service
@ -27,6 +30,9 @@ import java.util.List;
@Service
public class EngineVersionServiceImpl extends ServiceImpl<EngineVersionMapper, EngineVersion> implements EngineVersionService {
@Autowired
private NormalQueue normalQueue;
@Autowired
private EngineConfigService engineConfigService;
@ -91,5 +97,31 @@ public class EngineVersionServiceImpl extends ServiceImpl<EngineVersionMapper, E
.set(EngineVersion::getTestStatus,testStatus));
}
/**
*
* @param engineVersion
* @return
*/
@Override
public boolean update(EngineVersion engineVersion) {
//修改
boolean removed = this.updateById(engineVersion);
//异步更新
CompletableFuture.runAsync(()->{
this.renewalEngineVersion(engineVersion.getId());
});
return removed;
}
/**
* class
* @param engineVersionId
*/
public void renewalEngineVersion(Long engineVersionId){
//查询
EngineVersion engineVersion = this.getById(engineVersionId);
//发送消息
normalQueue.sendUUIDMsg(QueueNameConstants.VERSION_MESSAGE,engineVersion);
}
}

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -1,7 +1,7 @@
package com.ruoyi.ruleEngine.client.context;
import com.ruoyi.ruleEngine.client.model.process.DataModelProcessModel;
import com.ruoyi.ruleEngine.client.util.ScopeContextHolderUtil;
import com.ruoyi.ruleEngine.client.util.RuleEngineUtil;
import lombok.Data;
import lombok.experimental.SuperBuilder;

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml

View File

@ -14,11 +14,11 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
config:
# 配置中心地址
server-addr: 47.98.98.250:8848
server-addr: 139.224.220.40:8848
namespace: 143f1a53-e544-4782-8667-877c532e2c66
# 配置文件格式
file-extension: yml