feat(): 增加异步同步

dev
chao 2024-04-26 10:36:04 +08:00
parent 48346981dc
commit 8a897e1066
2 changed files with 13 additions and 4 deletions

View File

@ -5,6 +5,7 @@ import com.etl.common.security.annotation.EnableMyFeignClients;
import com.etl.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
*
@ -15,6 +16,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableCustomSwagger2
@EnableMyFeignClients
@SpringBootApplication
@EnableAsync
public class ETLDataSourceApplication {
public static void main (String[] args) {
SpringApplication.run(ETLDataSourceApplication.class, args);

View File

@ -19,6 +19,7 @@ import com.etl.data.type.domain.DataType;
import com.etl.data.type.service.IDataTypeService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import redis.clients.jedis.Jedis;
@ -370,6 +371,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
* @param dataSource
* @param dataType
*/
@Async
public Map<String, Object> jdbcSync(AssetStructure assetStructure, DataSource dataSource, DataType dataType) {
if (dataType.getDriverManager() != null && dataType.getJdbcPre() != null) {
if ("mysql".equals(dataType.getDataType())) {
@ -481,6 +483,7 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
* @param assetStructureTableList
* @param conn
*/
@Async
public void assetTableDetailsSync(List<AssetStructureTable> assetStructureTableList, Connection conn) {
List<AssetTableDetails> assetTableDetails = new ArrayList<>();
try {
@ -514,7 +517,8 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
* @return
* @throws SQLException
*/
private ResultSet getColumns(DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException {
@Async
public ResultSet getColumns(DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException {
return metaData.getColumns(null, null, assetStructureTable.getTableName(), null);
}
@ -530,7 +534,8 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
* @throws IllegalAccessException
* @throws InstantiationException
*/
private AssetTableDetails processRow(ResultSet rs, DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException {
@Async
public AssetTableDetails processRow(ResultSet rs, DatabaseMetaData metaData, Connection conn, AssetStructureTable assetStructureTable) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException {
String columnName = rs.getString("COLUMN_NAME");
String remarks = rs.getString("REMARKS");
boolean isPrimaryKey = isColumnPrimaryKey(metaData, assetStructureTable.getTableName(), columnName);
@ -570,7 +575,8 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
* @return
* @throws SQLException
*/
private boolean isColumnPrimaryKey(DatabaseMetaData metaData, String tableName, String columnName) throws SQLException {
@Async
public boolean isColumnPrimaryKey(DatabaseMetaData metaData, String tableName, String columnName) throws SQLException {
try (ResultSet rs = metaData.getPrimaryKeys(null, null, tableName)) {
while (rs.next()) {
String primaryKeyColumnName = rs.getString("COLUMN_NAME");
@ -594,7 +600,8 @@ public class DataSourceServiceImpl extends ServiceImpl<DataSourceMapper, DataSou
* @throws IllegalAccessException
* @throws InstantiationException
*/
private String getMappingType(Connection conn, String dataDetailsSQL, String columnName) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException {
@Async
public String getMappingType(Connection conn, String dataDetailsSQL, String columnName) throws SQLException, ClassNotFoundException, IllegalAccessException, InstantiationException {
try (
PreparedStatement st = conn.prepareStatement(dataDetailsSQL);
ResultSet rs = st.executeQuery()) {