fix:() 优化任务调动方法

master
Yueng 2024-09-10 17:43:42 +08:00
parent 9d7cea8f8e
commit 86c56e8348
11 changed files with 224 additions and 44 deletions

View File

@ -5,13 +5,11 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.muyu.common.core.web.domain.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Date;
import java.util.function.Supplier;
/**
* @Author
@ -26,7 +24,7 @@ import java.util.function.Supplier;
@NoArgsConstructor
@AllArgsConstructor
//核心数据库
public class DataSourceList extends BaseEntity {
public class ProductData extends BaseEntity {
/**
* /

View File

@ -2,7 +2,7 @@ package com.muyu.server.controller;
import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.CoreDataList;
import com.muyu.common.domain.DataSourceList;
import com.muyu.common.domain.ProductData;
import com.muyu.server.service.CoreDataListService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -33,15 +33,15 @@ public class CoreDataListController {
/**
*
* @param dataSourceList
* @param productData
* @return
*/
@PostMapping("/selectData")
@Operation(summary = "查询数据",description = "查询数据")
public Result<List<CoreDataList>> extractData(
@Validated @RequestBody DataSourceList dataSourceList
@Validated @RequestBody ProductData productData
){
return Result.success(coreDataListService.extractData(dataSourceList));
return Result.success(coreDataListService.extractData(productData));
}

View File

@ -4,6 +4,7 @@ import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.*;
import com.muyu.server.service.DataLinkService;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@ -18,6 +19,7 @@ import java.util.List;
@RestController
@AllArgsConstructor
@RequestMapping("/dataLink")
@Log4j2
public class DataLinkController {
private final DataLinkService dataLinkService;
@ -34,6 +36,28 @@ public class DataLinkController {
return Result.success(longs);
}
/**
* count
* @param basicDataName
* @param tableName sql
* @param listList sql
* @return
*/
@PostMapping("/addProduct")
public Result addProduct(@RequestParam("basicDataName") String basicDataName, @RequestParam("tableName") String tableName, @RequestBody DataValue[][] listList){
long begin = System.currentTimeMillis();
int i = dataLinkService.addProduct(basicDataName,tableName,listList);
long end = System.currentTimeMillis();
long allTime = end - begin;
log.info("添加到产品数据库耗时:"+allTime);
return Result.success(i);
}
/**
* dashuju
* @param databaseName

View File

@ -3,12 +3,10 @@ package com.muyu.server.controller;
import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.*;
import com.muyu.server.service.DataSourceService;
import com.muyu.server.service.TableFieldService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@ -36,7 +34,7 @@ public class DataSourceController {
*/
@PostMapping("/extractData")
@Operation(summary = "抽取数据",description = "从数据源中抽取数据")
public Result<List<DataSourceList>> extractData(@RequestBody Connect connect){
public Result<List<ProductData>> extractData(@RequestBody Connect connect){
return Result.success(dataSourceService.extractData(connect),"您的数据已成功获取");
}

View File

@ -1,7 +1,7 @@
package com.muyu.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.muyu.common.domain.DataSourceList;
import com.muyu.common.domain.ProductData;
import org.apache.ibatis.annotations.Mapper;
/**
@ -12,6 +12,6 @@ import org.apache.ibatis.annotations.Mapper;
* @Date2024/8/20 18:56
*/
@Mapper
public interface DataSourceMapper extends BaseMapper<DataSourceList> {
public interface DataSourceMapper extends BaseMapper<ProductData> {
}

View File

@ -3,7 +3,7 @@ package com.muyu.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.muyu.common.domain.CoreDataList;
import com.muyu.common.domain.DataSourceList;
import com.muyu.common.domain.ProductData;
import java.util.List;
@ -18,8 +18,8 @@ public interface CoreDataListService extends IService<CoreDataList> {
/**
*
* @param dataSourceList
* @param productData
* @return
*/
List<CoreDataList> extractData(DataSourceList dataSourceList);
List<CoreDataList> extractData(ProductData productData);
}

View File

@ -58,4 +58,13 @@ public interface DataLinkService {
* @return
*/
List<TableFie> findSelectTableFieShow(String dataTableName);
/**
* count
* @param basicDataName
* @param tableName sql
* @param listList sql
* @return
*/
int addProduct(String basicDataName, String tableName, DataValue[]... listList);
}

View File

@ -13,14 +13,14 @@ import java.util.List;
* @nameDataSourceService
* @Date2024/8/20 18:56
*/
public interface DataSourceService extends IService<DataSourceList> {
public interface DataSourceService extends IService<ProductData> {
/**
*
* @param connect
* @return
*/
List<DataSourceList> extractData(Connect connect);
List<ProductData> extractData(Connect connect);
/**
*

View File

@ -3,7 +3,7 @@ package com.muyu.server.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.domain.CoreDataList;
import com.muyu.common.domain.DataSourceList;
import com.muyu.common.domain.ProductData;
import com.muyu.server.mapper.CoreDataListMapper;
import com.muyu.server.service.CoreDataListService;
import com.muyu.server.service.DataService;
@ -30,20 +30,20 @@ public class CoreDataListServiceImpl
/**
*
* @param dataSourceList
* @param productData
* @return
*/
@Override
public List<CoreDataList> extractData(DataSourceList dataSourceList) {
public List<CoreDataList> extractData(ProductData productData) {
LambdaQueryWrapper<CoreDataList> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.like(
StringUtils.isNotEmpty(dataSourceList.getName()),
CoreDataList::getName, dataSourceList.getName()
StringUtils.isNotEmpty(productData.getName()),
CoreDataList::getName, productData.getName()
);
queryWrapper.like(
StringUtils.isNotEmpty(dataSourceList.getAddress()),
CoreDataList::getAddress, dataSourceList.getAddress()
StringUtils.isNotEmpty(productData.getAddress()),
CoreDataList::getAddress, productData.getAddress()
);
List<CoreDataList> dataSourceListList = this.list(queryWrapper);
return dataSourceListList.stream()

View File

@ -3,29 +3,31 @@ package com.muyu.server.service.impl;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.muyu.common.domain.DataName;
import com.muyu.common.domain.DataValue;
import com.muyu.common.domain.TableFie;
import com.muyu.common.domain.TableNames;
import com.muyu.common.domain.*;
import com.muyu.common.domain.data.base.BaseQueryHandler;
import com.muyu.common.domain.enums.DataType;
import com.muyu.server.mapper.DataRunNameMapper;
import com.muyu.common.domain.req.ConnectReq;
import com.muyu.common.log.annotation.Log;
import com.muyu.common.log.enums.BusinessType;
import com.muyu.server.mysql.MySqlDataSource;
import com.muyu.server.mysql.MySqlQuery;
import com.muyu.server.service.*;
import com.muyu.server.util.JdbcHelper;
import lombok.AllArgsConstructor;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.sql.*;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*@Authoryang
@ -45,6 +47,8 @@ public class DataLinkServiceImpl implements DataLinkService {
private DataRunNameService dataRunNameService;
@Autowired
private TableFieldService tableFieldService;
@Autowired
private ConnectService connectService;
/**
* count
* @param dataName
@ -127,6 +131,156 @@ public class DataLinkServiceImpl implements DataLinkService {
return tableFieList;
}
@Override
@Log(title = "添加产品数据库", businessType = BusinessType.INSERT)
public int addProduct(String databaseName, String tableName, DataValue[]... listList) {
Connect connect = connectService.selectData(databaseName);
ConnectReq connectReq = new ConnectReq();
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setPoolName("HikariCP 连接池");
hikariConfig.setDriverClassName(connectReq.getDRIVER());
// 修正这里
hikariConfig.setJdbcUrl(connectReq.getUrl(connect));
hikariConfig.setUsername(connectReq.getUSER());
hikariConfig.setPassword(connectReq.getPWD());
hikariConfig.setMinimumIdle(2);
hikariConfig.setMaximumPoolSize(10);
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
ExecutorService executorService = Executors.newFixedThreadPool(6);
AtomicInteger addCount = new AtomicInteger();
// 分割数据为较小的批次
List<DataValue[][]> batches = splitData(listList, 5000);
try (Connection conn = hikariDataSource.getConnection()) {
conn.setAutoCommit(false); // 开启事务
for (DataValue[][] batch : batches) {
executorService.submit(() -> {
try (Statement stmt = conn.createStatement()) {
String sql = buildBatchInsertSQL(tableName, batch);
stmt.executeUpdate(sql);
addCount.addAndGet(batch.length);
} catch (SQLException e) {
log.error("SQLException异常发生", e);
try {
conn.rollback(); // 回滚事务
} catch (SQLException ex) {
log.error("回滚事务失败", ex);
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
} catch (Exception e) {
log.error("其他异常发生", e);
try {
conn.rollback(); // 回滚事务
} catch (SQLException ex) {
log.error("回滚事务失败", ex);
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.HOURS)) {
log.warn("Executor service did not terminate within the timeout.");
executorService.shutdownNow();
}
conn.commit(); // 提交事务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
close(hikariDataSource); // 关闭数据源
}
return addCount.get();
}
private static void close(HikariDataSource dataSource) {
if (dataSource != null) {
dataSource.close();
}
}
private List<DataValue[][]> splitData(DataValue[][] listList, int batchSize) {
List<DataValue[][]> batches = new ArrayList<>();
int totalRows = listList.length;
int totalBatches = (int) Math.ceil((double) totalRows / batchSize);
for (int i = 0; i < totalBatches; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, totalRows);
DataValue[][] batch = Arrays.copyOfRange(listList, start, end);
batches.add(batch);
}
return batches;
}
private String buildBatchInsertSQL(String tableName, DataValue[][] batch) {
StringBuilder columns = new StringBuilder("(");
StringBuilder values = new StringBuilder("VALUES ");
// 构建字段名
for (DataValue dataValue : batch[0]) {
String key = dataValue.getKey();
columns.append(key).append(", ");
}
// 删除最后一个逗号和空格
columns.delete(columns.length() - 2, columns.length());
// 构建值部分
for (DataValue[] dataValueList : batch) {
values.append("(");
for (DataValue dataValue : dataValueList) {
Object value = dataValue.getValue();
values.append(formatValue(dataValue.getType(), value)).append(", ");
}
values.delete(values.length() - 2, values.length());
values.append("), ");
}
// 删除最后一个逗号
values.delete(values.length() - 2, values.length());
// 完成 SQL 插入语句
String sql = "INSERT INTO " + tableName + " " + columns.toString() + ") " + values.toString();
return sql;
}
private String formatValue(DataType type, Object value) {
if (value == null) {
// 根据业务需求处理 null 值
return "NULL"; // 或者其他默认值
}
if (type == DataType.VARCHAR || type == DataType.TEXT) {
return "'" + value.toString().replace("'", "''") + "'";
} else if (type == DataType.BIGINT) {
return value.toString();
} else if (type == DataType.INT) {
return value.toString();
} else if (type == DataType.DECIMAL) {
return value.toString();
} else if (type == DataType.DATETIME) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return "'" + sdf.format((Date) value) + "'";
} else if (type == DataType.DOUBLE) {
return value.toString();
} else {
return "'" + value.toString() + "'";
}
}
public List<TableFie> tableStructureShow(TableNames tableNames) {

View File

@ -6,11 +6,8 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.common.core.utils.StringUtils;
import com.muyu.common.domain.*;
import com.muyu.server.mapper.DataSourceMapper;
import com.muyu.server.service.DataRunNameService;
import com.muyu.server.service.DataSourceService;
import com.muyu.server.service.TableFieldService;
import com.muyu.server.util.JdbcHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.*;
@ -29,7 +26,7 @@ import java.util.concurrent.Executors;
*/
@Service
public class DataSourceServiceImpl
extends ServiceImpl<DataSourceMapper, DataSourceList>
extends ServiceImpl<DataSourceMapper, ProductData>
implements DataSourceService {
/**
@ -38,9 +35,9 @@ public class DataSourceServiceImpl
* @return
*/
@Override
public List<DataSourceList> extractData(Connect connect) {
public List<ProductData> extractData(Connect connect) {
ExecutorService executorService = Executors.newFixedThreadPool(100);
ArrayList<DataSourceList> dataSourceLists = new ArrayList<>();
ArrayList<ProductData> productData = new ArrayList<>();
int size = 5000;
@ -90,10 +87,10 @@ public class DataSourceServiceImpl
String idCard = (rs.getString("idCard"));
String email = (rs.getString("email"));
String credit = (rs.getString("credit"));
DataSourceList dataSourceList = new DataSourceList(
ProductData productData = new ProductData(
id,name,gender,birthday,address,mibile,idCard,email,credit
);
dataSourceLists.add(dataSourceList);
productData.add(productData);
}
if (!rs.next()){
break;
@ -109,7 +106,7 @@ public class DataSourceServiceImpl
e.printStackTrace();
}
});
return dataSourceLists;
return productData;
}
/**