feat():分离客户端

master
Saisai Liu 2024-05-12 22:43:15 +08:00
parent 0f399010c9
commit 6af3adc8fb
24 changed files with 468 additions and 69 deletions

View File

@ -79,21 +79,21 @@
</dependency> </dependency>
<!-- MuYu Common Log --> <!-- MuYu Common Log -->
<dependency> <!-- <dependency>-->
<groupId>com.muyu</groupId> <!-- <groupId>com.muyu</groupId>-->
<artifactId>muyu-common-log</artifactId> <!-- <artifactId>muyu-common-log</artifactId>-->
</dependency> <!-- </dependency>-->
<!-- MuYu Common Swagger --> <!-- MuYu Common Swagger -->
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-common-swagger</artifactId> <artifactId>muyu-common-swagger</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-etl-remote</artifactId> <artifactId>muyu-etl-remote</artifactId>
<version>3.6.3</version> <version>3.6.3</version>
<scope>compile</scope>
</dependency> </dependency>

View File

@ -5,8 +5,8 @@ import com.muyu.common.core.constant.SecurityConstants;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.common.core.web.page.TableDataInfo; import com.muyu.common.core.web.page.TableDataInfo;
import com.muyu.etl.domain.BasicConfigInfo; import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.uitl.DruidUtilsFactory; import com.muyu.etl.util.factory.DruidUtilsFactory;
import com.muyu.etl.uitl.service.ConnectionPoolFactory; import com.muyu.etl.util.service.ConnectionPoolFactory;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@ -34,20 +34,30 @@ public class AssetClientRunner implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) throws ServletException { public void run(ApplicationArguments args) throws ServletException {
Result<TableDataInfo<BasicConfigInfo>> result = remoteAssetService.list(new BasicConfigInfo(), SecurityConstants.INNER); BasicConfigInfo basicConfigInfoSel = new BasicConfigInfo() {{
setIsTest("1");
}};
Result<TableDataInfo<BasicConfigInfo>> result = remoteAssetService.list(basicConfigInfoSel, SecurityConstants.INNER);
log.info("初始话内容为{}", result); log.info("初始话内容为{}", result);
if (result.getData().getRows().isEmpty()) throw new ServletException("初始化调用失败,无数据"); List<BasicConfigInfo> rows = result.getData().getRows();
result.getData().getRows().stream().map(basicConfigInfo -> { if (rows.isEmpty()) throw new ServletException("初始化调用失败,无数据");
log.info(rows);
int a = 0;
for (BasicConfigInfo basicConfigInfo : rows) {
a++;
log.info("a:{}", a);
log.warn("接入信息:{}", basicConfigInfo);
DruidPooledConnection init = null; DruidPooledConnection init = null;
try { try {
log.info("初始化开始");
init = connectionPoolFactory.init(basicConfigInfo); init = connectionPoolFactory.init(basicConfigInfo);
log.info("初始化结果:{}", init); log.info("初始化结果:{}", init);
} catch (SQLException e) { } catch (Exception e) {
log.error("初始化异常:{}", e.getMessage()); log.error("初始化异常:{}", e.getMessage());
throw new RuntimeException(e); //跳过
continue;
}
} }
return init;
});
log.info("初始化完成"); log.info("初始化完成");
log.info("连接池::{}", DruidUtilsFactory.map); log.info("连接池::{}", DruidUtilsFactory.map);

View File

@ -0,0 +1,26 @@
//package com.muyu.etl;
//
//import com.muyu.common.security.annotation.EnableCustomConfig;
//import com.muyu.common.security.annotation.EnableMyFeignClients;
//import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
//import com.sun.tools.javac.Main;
//import org.springframework.boot.SpringApplication;
//import org.springframework.boot.autoconfigure.SpringBootApplication;
//import org.springframework.cloud.openfeign.EnableFeignClients;
//
///**
// * @ClassName Main
// * @Description 描述
// * @Author SaiSai.Liu
// * @Date 2024/5/11 16:20
// */
//@EnableCustomConfig
//@EnableCustomSwagger2
//@EnableMyFeignClients
//@EnableFeignClients
//@SpringBootApplication
//public class MainApplication {
// public static void main(String[] args) {
// SpringApplication.run(MainApplication.class);
// }
//}

View File

@ -1,2 +1,3 @@
com.muyu.etl.AssetClientRunner com.muyu.etl.AssetClientRunner
com.muyu.etl.uitl.DruidUtilsFactory com.muyu.etl.util.factory.DruidUtilsFactory
#com.muyu.etl.util.service.ConnectionPoolFactory

View File

@ -1,6 +1,6 @@
# Tomcat # Tomcat
server: server:
port: 9215 port: 9225
# Spring # Spring
spring: spring:
@ -26,4 +26,4 @@ spring:
logging: logging:
level: level:
com.muyu.etl.mapper: DEBUG com.muyu.etl.mapper: DEBUG
com.example.springmvctest.feign.api: DEBUG com.muyu.etl.feign.api: DEBUG

View File

@ -26,5 +26,15 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-common-security</artifactId> <artifactId>muyu-common-security</artifactId>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.8</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,4 +1,4 @@
package com.muyu.etl.uitl; package com.muyu.etl.util;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;

View File

@ -1,9 +1,9 @@
package com.muyu.etl.uitl; package com.muyu.etl.util.factory;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.pool.DruidPooledConnection;
import com.muyu.etl.domain.BasicConfigInfo; import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.uitl.service.ConnectionPoolFactory; import com.muyu.etl.util.service.ConnectionPoolFactory;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
@ -15,6 +15,7 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
/** /**
* *
*
* @param basicConfigInfo * @param basicConfigInfo
* @return * @return
* @throws SQLException * @throws SQLException
@ -32,10 +33,18 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
source.setUrl("jdbc:" + databaseType + "://" + host + ":" + port + "/" + databaseName + "?" + basicConfigInfo.getConnectionParams()); source.setUrl("jdbc:" + databaseType + "://" + host + ":" + port + "/" + databaseName + "?" + basicConfigInfo.getConnectionParams());
source.setUsername(basicConfigInfo.getUsername()); source.setUsername(basicConfigInfo.getUsername());
source.setPassword(basicConfigInfo.getPassword()); source.setPassword(basicConfigInfo.getPassword());
//初始化连接量
source.setInitialSize(Math.toIntExact(basicConfigInfo.getInitLinkNum())); source.setInitialSize(Math.toIntExact(basicConfigInfo.getInitLinkNum()));
//最大连接数
source.setMaxActive(Math.toIntExact(basicConfigInfo.getMaxLinkNum())); source.setMaxActive(Math.toIntExact(basicConfigInfo.getMaxLinkNum()));
source.setMaxWaitThreadCount(Math.toIntExact(basicConfigInfo.getMaxWaitTimes())); source.setMaxWaitThreadCount(Math.toIntExact(basicConfigInfo.getMaxWaitTimes()));
//连接池中的最大最小生存时间
source.setMaxEvictableIdleTimeMillis(basicConfigInfo.getMaxWaitTime()); source.setMaxEvictableIdleTimeMillis(basicConfigInfo.getMaxWaitTime());
source.setMinEvictableIdleTimeMillis(30000);
//最大等待时间
source.setTimeBetweenEvictionRunsMillis(30000);
//进程最大等待时间
source.setMaxWait(basicConfigInfo.getMaxWaitTime());
DruidPooledConnection pool = source.getConnection(); DruidPooledConnection pool = source.getConnection();
map.put(host + ":" + port + "/" + databaseName, pool); map.put(host + ":" + port + "/" + databaseName, pool);
// 获取并返回连接池对象 // 获取并返回连接池对象
@ -44,6 +53,7 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
/** /**
* *
*
* @param basicConfigInfo * @param basicConfigInfo
* @return * @return
*/ */
@ -59,6 +69,7 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
/** /**
* *
*
* @param basicConfigInfo * @param basicConfigInfo
* @param connection * @param connection
*/ */
@ -72,5 +83,4 @@ public class DruidUtilsFactory implements ConnectionPoolFactory<BasicConfigInfo>
} }
} }

View File

@ -1,4 +1,4 @@
package com.muyu.etl.uitl.service; package com.muyu.etl.util.service;
import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.pool.DruidPooledConnection;

View File

@ -89,10 +89,24 @@
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>muyu-common-swagger</artifactId> <artifactId>muyu-common-swagger</artifactId>
</dependency> </dependency>
<!-- SpringBoot Web-->
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>druid</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<version>1.2.8</version> </dependency>
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 消息转换器 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency> </dependency>

View File

@ -45,7 +45,7 @@ public class BasicConfigInfoController extends BaseController {
*/ */
// @RequiresPermissions("etl:info:list") // @RequiresPermissions("etl:info:list")
@PostMapping("/list") @PostMapping("/list")
public Result<TableDataInfo<BasicConfigInfo>> list(BasicConfigInfo basicConfigInfo) { public Result<TableDataInfo<BasicConfigInfo>> list(@RequestBody BasicConfigInfo basicConfigInfo) {
startPage(); startPage();
List<BasicConfigInfo> list = basicConfigInfoService.selectBasicConfigInfoList(basicConfigInfo); List<BasicConfigInfo> list = basicConfigInfoService.selectBasicConfigInfoList(basicConfigInfo);
return getDataTable(list); return getDataTable(list);

View File

@ -1,8 +1,10 @@
package com.muyu.etl.service.impl; package com.muyu.etl.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.utils.uuid.UUID;
import com.muyu.common.security.utils.SecurityUtils; import com.muyu.common.security.utils.SecurityUtils;
import com.muyu.etl.domain.BasicConfigInfo; import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.domain.Structure; import com.muyu.etl.domain.Structure;
@ -18,6 +20,12 @@ import com.muyu.etl.service.StructureService;
import com.muyu.etl.service.TableInfoService; import com.muyu.etl.service.TableInfoService;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -50,6 +58,9 @@ public class BasicConfigInfoServiceImpl extends ServiceImpl<BasicConfigInfoMappe
@Autowired @Autowired
private TableInfoService tableInfoService; private TableInfoService tableInfoService;
@Autowired
private RabbitTemplate rabbitTemplate;
/** /**
* *
@ -147,12 +158,12 @@ public class BasicConfigInfoServiceImpl extends ServiceImpl<BasicConfigInfoMappe
String databaseName = basicConfigInfo.getDatabaseName(); String databaseName = basicConfigInfo.getDatabaseName();
try { try {
Connection conn = getConn(basicConfigInfo); Connection conn = getConn(basicConfigInfo);
basicConfigInfo.setIsTest("1");
this.update(basicConfigInfo, new LambdaUpdateWrapper<BasicConfigInfo>() {{
eq(BasicConfigInfo::getId, basicConfigInfo.getId());
}});
System.out.println("Connected to the MySQL server successfully."); System.out.println("Connected to the MySQL server successfully.");
//同步数据库信息 //同步数据库信息
basicConfigInfo.setIsTest("1");
this.update(basicConfigInfo, new LambdaUpdateWrapper<>(BasicConfigInfo.class) {{
eq(BasicConfigInfo::getId, basicConfigInfo.getId());
}});
//树级结构,库,表 //树级结构,库,表
TableInfo tableInfoInsert = TableInfo.builder() TableInfo tableInfoInsert = TableInfo.builder()
.basicId(basicConfigInfo.getId()) .basicId(basicConfigInfo.getId())
@ -216,25 +227,34 @@ public class BasicConfigInfoServiceImpl extends ServiceImpl<BasicConfigInfoMappe
} }
} }
}); });
Runnable thread = new Runnable() { // Runnable thread = new Runnable() {
@SneakyThrows // @SneakyThrows
@Override // @Override
public void run() { // public void run() {
try { // try {
//同步 // //同步
syncData(finalConn, databaseName, table); // syncData(finalConn, databaseName, table);
} catch (SQLException e) { // } catch (SQLException e) {
log.error(e.getMessage()); // log.error(e.getMessage());
throw new ServletException("连接失败"); //
} // throw new ServletException("连接失败(同步失败)");
} // }
}; // }
thread.run(); // };
// thread.run();
ps.close(); ps.close();
} }
conn.close(); conn.close();
rabbitTemplate.convertAndSend("basicInfoQueue", JSON.toJSONString(basicConfigInfo), message ->{
message.getMessageProperties().setConsumerTag(UUID.randomUUID().toString().replaceAll("-",""));
return message;
});
} catch (SQLException e) { } catch (SQLException e) {
log.error(e.getMessage()); log.error(e.getMessage());
basicConfigInfo.setIsTest("0");
this.update(basicConfigInfo, new LambdaUpdateWrapper<BasicConfigInfo>() {{
eq(BasicConfigInfo::getId, basicConfigInfo.getId());
}});
throw new ServletException("连接失败"); throw new ServletException("连接失败");
} }
return true; return true;
@ -465,4 +485,5 @@ public class BasicConfigInfoServiceImpl extends ServiceImpl<BasicConfigInfoMappe
return list; return list;
} }
} }

View File

@ -26,3 +26,20 @@ spring:
logging: logging:
level: level:
com.muyu.etl.mapper: DEBUG com.muyu.etl.mapper: DEBUG
rabbitmq:
host: 43.142.100.73
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
acknowledge-mode: manual # 设置消费端手动ack确认
retry:
enabled: true # 是否支持重试
template:
# 只要消息抵达Queue就会异步发送优先回调return firm
mandatory: true

View File

@ -1,18 +0,0 @@
package com.muyu;
import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients;
import com.muyu.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableCustomConfig
@EnableCustomSwagger2
@EnableMyFeignClients
@EnableFeignClients
@SpringBootApplication
public class EngineClientApplication {
public static void main(String[] args) {
System.out.println("Hello world!");
}
}

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>muyu-unit</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>muyu-unit-common</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-etl-client</artifactId>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-ruleEngine-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>muyu-unit</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>muyu-unit-remote</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,118 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>muyu-unit</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>muyu-unit-service</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-etl-client</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-ruleEngine-client</artifactId>
<version>3.6.3</version>
</dependency>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- SpringCloud Alibaba Sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<!-- SpringBoot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Swagger UI -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.fox.version}</version>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- MuYu Common DataSource -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-datasource</artifactId>
</dependency>
<!-- MuYu Common DataScope -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-datascope</artifactId>
</dependency>
<!-- MuYu Common Log -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-log</artifactId>
</dependency>
<!-- MuYu Common Swagger -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-swagger</artifactId>
</dependency>
<!-- MuYu Common Swagger -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-common-swagger</artifactId>
</dependency>
<!-- SpringBoot Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 消息转换器 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
</dependencies>
</project>

View File

@ -12,8 +12,8 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableMyFeignClients @EnableMyFeignClients
@EnableFeignClients @EnableFeignClients
@SpringBootApplication @SpringBootApplication
public class EtlClientApplication { public class UnitApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(EtlClientApplication.class); SpringApplication.run(UnitApplication.class);
} }
} }

View File

@ -0,0 +1,56 @@
package com.muyu.custom;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson2.JSON;
import com.muyu.etl.RemoteAssetService;
import com.muyu.etl.domain.BasicConfigInfo;
import com.muyu.etl.util.service.ConnectionPoolFactory;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.servlet.ServletException;
import java.io.IOException;
import java.sql.SQLException;
/**
*
* @ClassName AddInitConn
* @Description
* @Author SaiSai.Liu
* @Date 2024/5/12 14:18
*/
@Component
@Log4j2
public class AddInitConn {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConnectionPoolFactory<BasicConfigInfo> connectionPoolFactory;
@RabbitListener(queuesToDeclare = {@Queue(name = "basicInfoQueue")})
public void myConsumer(String basicInfo, Message message, Channel channel) throws ServletException {
if (basicInfo == null){
throw new ServletException("队列内容为空");
}
BasicConfigInfo basicConfigInfo = JSON.parseObject(basicInfo, BasicConfigInfo.class);
try {
DruidPooledConnection init = connectionPoolFactory.init(basicConfigInfo);
if (init == null){
// 消息拒绝,放回队列中重新消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}else {
// 消息消费确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
} catch (SQLException | IOException e) {
log.error("初始化连接池失败,消费失败");
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,4 @@
com.muyu.etl.AssetClientRunner
com.muyu.etl.factory.RemoteAssetFallbackFactory
com.muyu.etl.util.factory.DruidUtilsFactory
#com.muyu.etl.util.service.ConnectionPoolFactory

View File

@ -0,0 +1,46 @@
# Tomcat
server:
port: 9215
# Spring
spring:
application:
# 应用名称
name: muyu-unit
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: 43.142.100.73:8848
config:
# 配置中心地址
server-addr: 43.142.100.73:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
rabbitmq:
host: 43.142.100.73
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
acknowledge-mode: manual # 设置消费端手动ack确认
retry:
enabled: true # 是否支持重试
template:
# 只要消息抵达Queue就会异步发送优先回调return firm
mandatory: true
logging:
level:
com.muyu.etl.mapper: DEBUG

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>muyu-modules</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>muyu-unit</artifactId>
<packaging>pom</packaging>
<modules>
<module>muyu-unit-common</module>
<module>muyu-unit-service</module>
<module>muyu-unit-remote</module>
</modules>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -15,6 +15,7 @@
<module>muyu-file</module> <module>muyu-file</module>
<module>muyu-etl</module> <module>muyu-etl</module>
<module>muyu-ruleEngine</module> <module>muyu-ruleEngine</module>
<module>muyu-unit</module>
</modules> </modules>
<artifactId>muyu-modules</artifactId> <artifactId>muyu-modules</artifactId>

View File

@ -220,6 +220,13 @@
<version>${muyu.version}</version> <version>${muyu.version}</version>
</dependency> </dependency>
<!-- 客户端模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>muyu-unit-common</artifactId>
<version>${muyu.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>