feat:新增删除 引擎版本/数据源 同步 数据库连接池配置/引擎版本配置

dev
gtl 2024-05-17 11:56:12 +08:00
parent fe1831a58c
commit c783f979d8
21 changed files with 279 additions and 87 deletions

View File

@ -62,7 +62,27 @@ public class ConnectionPoolService {
// 获取数据源
DruidDataSource druidDataSource = connectionPoolContextHolder.get(id);
// 关闭数据源
try {
druidDataSource.close();
} catch (Exception e) {
log.error("关闭数据源key:{}失败",id);
}
}
}
/**
*
* @param id
*/
public void remove(Long id){
// 判断键是否存在
if(hasKey(id)){
// 关闭
this.closeDataSource(id);
// 删除
connectionPoolContextHolder.remove(id);
}else {
log.error("数据库连接池中不存在key:[{}]",id);
}
}

View File

@ -0,0 +1,21 @@
package com.ruoyi.dataAsset.constant;
/**
*
* @ClassName QueueConstants
* @Author
*/
public class QueueConstants {
//新增队列
public static final String INSERT_QUEUE="insert";
//删除队列
public static final String DELETE_QUEUE="delete";
//新增消息队列名称
public static final String DATASOURCE_INSERT_MESSAGE = "datasource_insert_message";
//删除消息队列名称
public static final String DATASOURCE_DELETE_MESSAGE = "datasource_delete_message";
}

View File

@ -1,13 +0,0 @@
package com.ruoyi.dataAsset.constant;
/**
*
* @ClassName QueueNameConstants
* @Author
*/
public class QueueNameConstants {
//发送短消息队列名称
public static final String DATASOURCE_MESSAGE = "send_datasource_message";
}

View File

@ -131,6 +131,6 @@ public class DataSourceController extends BaseController {
@ApiOperation("删除数据接入")
@ApiImplicitParam(name = "id", value = "id", required = true, dataType = "Long", paramType = "path", dataTypeClass = String.class, example = "1,2,3,4")
public Result<String> remove(@PathVariable List<Long> ids) {
return toAjax(dataSourceService.removeBatchByIds(ids));
return toAjax(dataSourceService.removeBatch(ids));
}
}

View File

@ -32,4 +32,11 @@ public interface DataSourceService extends IService<DataSource> {
* @param dataSourceId
*/
void SynchronousDataStructure(Long dataSourceId);
/**
*
* @param ids
* @return
*/
boolean removeBatch(List<Long> ids);
}

View File

@ -10,7 +10,7 @@ import com.ruoyi.common.core.exception.ServiceException;
import com.ruoyi.common.core.utils.ObjUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.dataAsset.constant.QueueNameConstants;
import com.ruoyi.dataAsset.constant.QueueConstants;
import com.ruoyi.dataAsset.domain.AssetModelData;
import com.ruoyi.dataAsset.domain.ColumnInfo;
import com.ruoyi.dataAsset.domain.TableInfo;
@ -111,7 +111,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
//异步更新数据源
CompletableFuture.runAsync(()->{
this.renewalDataSource(dataSource);
this.renewalDataSource(dataSource,QueueConstants.INSERT_QUEUE);
});
//如果为不可用修改为可用
if(dataSource.getStatus().equals("N")){
@ -122,6 +122,36 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
}
/**
*
* @param ids
* @return
*/
@Override
@Transactional
public boolean removeBatch(List<Long> ids) {
// 处理相关数据
ids.forEach(id->{
// 查找相关表信息
tableInfoService.list(new LambdaQueryWrapper<TableInfo>()
.eq(TableInfo::getDataSourceId,id))
.forEach(tableInfo->{
// 删除
this.removeRelevantData(tableInfo.getName(),id);
});
CompletableFuture.runAsync(()->{
DataSource dataSource = this.getById(id);
// 如果是有效数据源
if(dataSource.getStatus().equals("Y")){
// 刷新
this.renewalDataSource(dataSource,QueueConstants.DELETE_QUEUE);
}
});
});
// 删除数据源
return this.removeBatchByIds(ids);
}
/**
*
* @param dataSourceId
@ -183,16 +213,8 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
resultSet.close();
TableInfo tableInfo = TableInfo.builder().name(tableName).comment(comment).recordsTotal(recordsTotal).dataSourceId(dataSource.getId()).build();
//异步添加表数据
//可能同步过,先删除
TableInfo one = tableInfoService.getOne(new LambdaQueryWrapper<TableInfo>()
.eq(TableInfo::getName, tableName)
.eq(TableInfo::getDataSourceId, dataSource.getId()));
if(Objects.nonNull(one)){
tableInfoService.removeById(one.getId());
columnInfoService.remove(new LambdaQueryWrapper<ColumnInfo>()
.eq(ColumnInfo::getTableId,one.getId()));
}
this.removeRelevantData(tableName, dataSource.getId());
//添加
tableInfoService.save(tableInfo);
//异步存储表数据
@ -313,20 +335,48 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
}
}
/**
*
* @param tableName
* @param dataSourceId
*/
public void removeRelevantData(String tableName,Long dataSourceId){
// 查询表信息
TableInfo one = tableInfoService.getOne(new LambdaQueryWrapper<TableInfo>()
.eq(TableInfo::getName, tableName)
.eq(TableInfo::getDataSourceId, dataSourceId));
// 存在
if(Objects.nonNull(one)){
// 删除表
tableInfoService.removeById(one.getId());
// 删除字段
columnInfoService.remove(new LambdaQueryWrapper<ColumnInfo>()
.eq(ColumnInfo::getTableId,one.getId()));
}
}
/**
*
* @param dataSource
*/
public void renewalDataSource(DataSource dataSource){
// 发送消息
normalQueue.sendUUIDMsg(QueueNameConstants.DATASOURCE_MESSAGE,dataSource);
public void renewalDataSource(DataSource dataSource,String type){
String queueName=null;
Long id = dataSource.getId();
// 如果已经存在
if(connectionPoolService.hasKey(id)){
// 关闭旧的
connectionPoolService.closeDataSource(id);
}
if(QueueConstants.INSERT_QUEUE.equals(type)){
queueName= QueueConstants.DATASOURCE_INSERT_MESSAGE;
// 新增或替换数据源
connectionPoolService.put(dataSource);
}else{
queueName= QueueConstants.DATASOURCE_DELETE_MESSAGE;
// 删除数据源
connectionPoolService.remove(id);
}
// 发送消息
normalQueue.sendUUIDMsg(queueName,dataSource);
}
}

View File

@ -2,7 +2,7 @@ package com.ruoyi.dataTransform.consumer;
import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.dataAsset.constant.QueueNameConstants;
import com.ruoyi.dataAsset.constant.QueueConstants;
import com.ruoyi.dataAsset.domain.DataSource;
import com.ruoyi.dataAsset.service.ConnectionPoolService;
import lombok.extern.log4j.Log4j2;
@ -37,14 +37,14 @@ public class DataSourceConsumer {
/**
*
*/
@RabbitListener(queuesToDeclare = {@Queue(name = QueueNameConstants.DATASOURCE_MESSAGE)})
public void smsConsumer(Message message, Channel channel) {
@RabbitListener(queuesToDeclare = {@Queue(name = QueueConstants.DATASOURCE_INSERT_MESSAGE)})
public void renewalConsumer(Message message, Channel channel) {
DataSource dataSource = (DataSource) messageConverter.fromMessage(message);
log.info("数据源消费者接收到消息,消息内容:{}", JSON.toJSONString(dataSource));
log.info("数据源更新消费者接收到消息,消息内容:{}", JSON.toJSONString(dataSource));
// 获取消息的id
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(QueueNameConstants.DATASOURCE_MESSAGE, messageId);
Long count = redisTemplate.opsForSet().add(QueueConstants.DATASOURCE_INSERT_MESSAGE, messageId);
if (count != null&&count > 0) {
// 正常消费消息
Long id = dataSource.getId();
@ -57,19 +57,56 @@ public class DataSourceConsumer {
connectionPoolService.put(dataSource);
// 消费成功 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("数据源消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(dataSource));
log.info("数据源更新消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(dataSource));
}
} catch (Exception ex) {
log.error("数据源消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(dataSource), ex.getMessage());
log.error("数据源更新消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(dataSource), ex.getMessage());
// 删除 redis中 添加的消息的id
redisTemplate.opsForSet().remove(QueueNameConstants.DATASOURCE_MESSAGE, messageId);
redisTemplate.opsForSet().remove(QueueConstants.DATASOURCE_INSERT_MESSAGE, messageId);
// 回退消息
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
log.error("数据源消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(dataSource), e.getMessage());
log.error("数据源更新消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(dataSource), e.getMessage());
}
}
}
/**
*
*/
@RabbitListener(queuesToDeclare = {@Queue(name = QueueConstants.DATASOURCE_DELETE_MESSAGE)})
public void deleteConsumer(Message message, Channel channel) {
DataSource dataSource = (DataSource) messageConverter.fromMessage(message);
log.info("数据源移除消费者接收到消息,消息内容:{}", JSON.toJSONString(dataSource));
// 获取消息的id
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(QueueConstants.DATASOURCE_DELETE_MESSAGE, messageId);
if (count != null&&count > 0) {
// 正常消费消息
Long id = dataSource.getId();
// 如果已经存在
if(connectionPoolService.hasKey(id)){
// 关闭旧的
connectionPoolService.closeDataSource(id);
}
// 移除数据源
connectionPoolService.remove(id);
// 消费成功 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("数据源移除消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(dataSource));
}
} catch (Exception ex) {
log.error("数据源移除消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(dataSource), ex.getMessage());
// 删除 redis中 添加的消息的id
redisTemplate.opsForSet().remove(QueueConstants.DATASOURCE_DELETE_MESSAGE, messageId);
// 回退消息
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
log.error("数据源移除消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(dataSource), e.getMessage());
}
}
}
}

View File

@ -3,7 +3,7 @@ package com.ruoyi.dataTransform.consumer;
import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.ruleEngine.client.service.RuleEngineService;
import com.ruoyi.ruleEngine.constant.QueueNameConstants;
import com.ruoyi.ruleEngine.constant.QueueConstants;
import com.ruoyi.ruleEngine.domain.EngineVersion;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
@ -37,33 +37,64 @@ public class EngineVersionConsumer {
/**
*
*/
@RabbitListener(queuesToDeclare = {@Queue(name = QueueNameConstants.VERSION_MESSAGE)})
public void smsConsumer(Message message, Channel channel) {
@RabbitListener(queuesToDeclare = {@Queue(name = QueueConstants.VERSION_INSERT_MESSAGE)})
public void renewalConsumer(Message message, Channel channel) {
EngineVersion engineVersion = (EngineVersion) messageConverter.fromMessage(message);
log.info("引擎版本消费者接收到消息,消息内容:{}", JSON.toJSONString(engineVersion));
log.info("引擎版本更新消费者接收到消息,消息内容:{}", JSON.toJSONString(engineVersion));
// 获取消息的id
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(QueueNameConstants.VERSION_MESSAGE, messageId);
Long count = redisTemplate.opsForSet().add(QueueConstants.VERSION_INSERT_MESSAGE, messageId);
if (count != null&&count > 0) {
// 正常消费消息
// 新增或替换规则内容
ruleEngineService.put(engineVersion);
// 消费成功 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("引擎版本消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(engineVersion));
log.info("引擎版本更新消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(engineVersion));
}
} catch (Exception ex) {
log.error("引擎版本消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(engineVersion), ex.getMessage());
log.error("引擎版本更新消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(engineVersion), ex.getMessage());
// 删除 redis中 添加的消息的id
redisTemplate.opsForSet().remove(QueueNameConstants.VERSION_MESSAGE, messageId);
redisTemplate.opsForSet().remove(QueueConstants.VERSION_INSERT_MESSAGE, messageId);
// 回退消息
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
log.error("引擎版本消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(engineVersion), e.getMessage());
log.error("引擎版本更新消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(engineVersion), e.getMessage());
}
}
}
/**
*
*/
@RabbitListener(queuesToDeclare = {@Queue(name = QueueConstants.VERSION_DELETE_MESSAGE)})
public void deleteConsumer(Message message, Channel channel) {
EngineVersion engineVersion = (EngineVersion) messageConverter.fromMessage(message);
log.info("引擎版本移除消费者接收到消息,消息内容:{}", JSON.toJSONString(engineVersion));
// 获取消息的id
String messageId = message.getMessageProperties().getMessageId();
try {
Long count = redisTemplate.opsForSet().add(QueueConstants.VERSION_DELETE_MESSAGE, messageId);
if (count != null&&count > 0) {
// 正常消费消息
// 删除
ruleEngineService.remove(engineVersion.getId());
// 消费成功 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("引擎版本移除消费者接收到消息,消息内容:{},消费消息成功!", JSON.toJSONString(engineVersion));
}
} catch (Exception ex) {
log.error("引擎版本移除消费者接收到消息,消息内容:{},消费消息失败,错误信息:{}", JSON.toJSONString(engineVersion), ex.getMessage());
// 删除 redis中 添加的消息的id
redisTemplate.opsForSet().remove(QueueConstants.VERSION_DELETE_MESSAGE, messageId);
// 回退消息
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
log.error("引擎版本移除消费者接收到消息,消息内容:{},消费回退失败,错误信息:{}", JSON.toJSONString(engineVersion), e.getMessage());
}
}
}
}

View File

@ -7,7 +7,6 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;
@ -40,7 +39,6 @@ import java.util.Date;
@NoArgsConstructor
@AllArgsConstructor
@TableName("${tableName}")
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "${ClassName}", description = "${functionName}")
public class ${ClassName} extends ${Entity} {
@ -94,6 +92,7 @@ public class ${ClassName} extends ${Entity} {
#end
#end
#end
.remark(${className}QueryReq.getRemark())
.build();
}
@ -114,6 +113,7 @@ public class ${ClassName} extends ${Entity} {
#end
#end
#end
.remark(${className}QueryReq.getRemark())
.createBy(createBy.get())
.createTime(new Date())
.build();
@ -137,6 +137,7 @@ public class ${ClassName} extends ${Entity} {
#end
#end
#end
.remark(${className}QueryReq.getRemark())
.updateBy(updateBy.get())
.updateTime(new Date())
.build();

View File

@ -4,7 +4,6 @@ package ${packageName}.domain.req;
import ${import};
#end
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;

View File

@ -1,6 +1,5 @@
package ${packageName}.mapper;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import ${packageName}.domain.${ClassName};

View File

@ -4,7 +4,6 @@ package ${packageName}.domain.req;
import ${import};
#end
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;

View File

@ -4,7 +4,6 @@ package ${packageName}.domain.req;
import ${import};
#end
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;

View File

@ -1,7 +1,6 @@
package ${packageName}.service.impl;
import java.util.List;
import com.ruoyi.common.core.utils.ObjUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

View File

@ -50,7 +50,7 @@ public class RuleEngineService {
if(hasKey(id)){
ruleEngineContextHolder.remove(id);
}else {
log.error("引擎上下文中不存在key:[{}]",id);
log.error("引擎上下文中不存在key:{}",id);
}
}
@ -64,7 +64,7 @@ public class RuleEngineService {
if(hasKey(id)){
map=ruleEngineContextHolder.get(id);
}else {
log.error("引擎上下文中不存在key:[{}]",id);
log.error("引擎上下文中不存在key:{}",id);
}
return map;
}

View File

@ -0,0 +1,21 @@
package com.ruoyi.ruleEngine.constant;
/**
*
* @ClassName QueueConstants
* @Author
*/
public class QueueConstants {
//新增队列
public static final String INSERT_QUEUE = "insert";
//删除队列
public static final String DELETE_QUEUE = "delete";
//新增消息队列名称
public static final String VERSION_INSERT_MESSAGE = "version_insert_message";
//删除消息队列名称
public static final String VERSION_DELETE_MESSAGE = "version_delete_message";
}

View File

@ -1,13 +0,0 @@
package com.ruoyi.ruleEngine.constant;
/**
*
* @ClassName QueueNameConstants
* @Author
*/
public class QueueNameConstants {
//发送短消息队列名称
public static final String VERSION_MESSAGE = "send_version_message";
}

View File

@ -96,7 +96,7 @@ public class EngineVersionController extends BaseController {
@ApiOperation("删除引擎规则版本")
@ApiImplicitParam(name = "id", value = "id", required = true, dataType = "Long", paramType = "path", dataTypeClass = String.class, example = "1,2,3,4")
public Result<String> remove(@PathVariable List<Long> ids) {
return toAjax(engineVersionService.removeBatchByIds(ids));
return toAjax(engineVersionService.removeBatch(ids));
}
}

View File

@ -43,4 +43,11 @@ public interface EngineVersionService extends IService<EngineVersion> {
* @return
*/
boolean update(EngineVersion engineVersion);
/**
*
* @param ids
* @return
*/
boolean removeBatch(List<Long> ids);
}

View File

@ -10,8 +10,6 @@ import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@ -64,7 +62,9 @@ public class EngineConfigServiceImpl implements EngineConfigService {
String path=ConfigCodeConstants.BASE_FILE_PATH+scope;
String code = null;
try {
code = Files.readString(Paths.get(path));
ClassPathResource resource = new ClassPathResource(path);
InputStream inputStream = resource.getInputStream();
code = IOUtils.toString(inputStream, String.valueOf(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -5,7 +5,7 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.utils.ObjUtils;
import com.ruoyi.ruleEngine.constant.EngineVersionConstants;
import com.ruoyi.ruleEngine.constant.QueueNameConstants;
import com.ruoyi.ruleEngine.constant.QueueConstants;
import com.ruoyi.ruleEngine.domain.EngineVersion;
import com.ruoyi.ruleEngine.domain.req.VersionClassCreateReq;
import com.ruoyi.ruleEngine.domain.resp.EngineConfigScopeResp;
@ -17,6 +17,8 @@ import com.ruoyi.ruleEngine.service.EngineVersionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -105,23 +107,49 @@ public class EngineVersionServiceImpl extends ServiceImpl<EngineVersionMapper, E
@Override
public boolean update(EngineVersion engineVersion) {
//修改
boolean removed = this.updateById(engineVersion);
boolean updated = this.updateById(engineVersion);
//异步更新
CompletableFuture.runAsync(()->{
this.renewalEngineVersion(engineVersion.getId());
this.renewalEngineVersion(engineVersion.getId(),QueueConstants.INSERT_QUEUE);
});
return removed;
return updated;
}
/**
*
* @param ids
* @return
*/
@Override
@Transactional
public boolean removeBatch(List<Long> ids) {
// 同步引擎版本的class编码
ids.forEach(id->{
EngineVersion engineVersion = this.getById(id);
if(engineVersion.getTestStatus().equals("1")){
this.renewalEngineVersion(id,QueueConstants.DELETE_QUEUE);
}
});
// 批量删除
return this.removeBatchByIds(ids);
}
/**
* class
* @param engineVersionId
* @param type
*/
public void renewalEngineVersion(Long engineVersionId){
public void renewalEngineVersion(Long engineVersionId,String type){
String queueName=null;
//查询
EngineVersion engineVersion = this.getById(engineVersionId);
if(QueueConstants.INSERT_QUEUE.equals(type)){
queueName=QueueConstants.VERSION_INSERT_MESSAGE;
}else {
queueName=QueueConstants.VERSION_DELETE_MESSAGE;
}
//发送消息
normalQueue.sendUUIDMsg(QueueNameConstants.VERSION_MESSAGE,engineVersion);
normalQueue.sendUUIDMsg(queueName,engineVersion);
}
}