09081017:给任务模块提供查询和添加的接口
parent
5b817cf56b
commit
ae6729c434
|
@ -13,9 +13,7 @@ import lombok.experimental.SuperBuilder;
|
|||
|
||||
/**
|
||||
* 结构对象 structure
|
||||
*
|
||||
* @author Saisai
|
||||
* @date 2024-04-22
|
||||
* @author Lenovo
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
|
|
|
@ -22,11 +22,20 @@ public class DataValueModel {
|
|||
* 数据接入的ID
|
||||
*/
|
||||
private Long basicId;
|
||||
private Long tableId;
|
||||
/**
|
||||
* 拼写的sql语句
|
||||
*/
|
||||
private String sql;
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private Long one;
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private Integer two;
|
||||
|
||||
private DataValueModel[][] dataValues;
|
||||
|
||||
}
|
||||
|
|
|
@ -101,7 +101,15 @@ public class DataValueController {
|
|||
// 返回数据
|
||||
return Result.success(dataValues);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@PostMapping("/addTableValueByType")
|
||||
public Result addTableValueByType(@RequestBody DataValueModel dataValueModel){
|
||||
//开始执行的时间
|
||||
long begin = System.currentTimeMillis();
|
||||
int i =dataValueService.addTableValueByType(dataValueModel);
|
||||
// 结束执行的时间
|
||||
long end =System.currentTimeMillis();
|
||||
//打印出执行该方法的耗时
|
||||
System.out.println("耗时:"+(end-begin));
|
||||
return Result.success();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,4 +32,5 @@ public interface DataValueService extends IService<DataValue> {
|
|||
*/
|
||||
DataValue[][] findTableValueByTable(DataValueModel dataValueModel);
|
||||
|
||||
int addTableValueByType(DataValueModel dataValueModel);
|
||||
}
|
||||
|
|
|
@ -22,11 +22,9 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.muyu.source.pool.config.BaseConfig.SELECTALL;
|
||||
|
||||
|
@ -343,6 +341,105 @@ public class DataValueServiceImpl extends ServiceImpl<DataValueMapper, DataValue
|
|||
return rows;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据基础表ID和SQL语句查询数据
|
||||
* @param dataValueModel 数据
|
||||
* @return List<DataValue>
|
||||
*/
|
||||
@Override
|
||||
public int addTableValueByType(DataValueModel dataValueModel) {
|
||||
MysqlQuery mysqlQuery = new MysqlQuery();
|
||||
mysqlQuery.setDataSourceId(dataValueModel.getBasicId());
|
||||
DataSource dataSource = dataSourceService.getById(dataValueModel.getBasicId());
|
||||
|
||||
MysqlPool mysqlPool = new MysqlPool(dataSource);
|
||||
mysqlPool.init();
|
||||
ExecutorService executorService1 = Executors.newFixedThreadPool(8);
|
||||
AtomicInteger atomicInteger = new AtomicInteger();
|
||||
List<DataValue[][]> batches =splitData(dataValueModel.getDataValues(),2000);
|
||||
Connection conn = mysqlPool.getConn();
|
||||
|
||||
try {
|
||||
conn.setAutoCommit(false);
|
||||
for (DataValue[][] batch : batches) {
|
||||
executorService1.submit(()->{
|
||||
try {
|
||||
Statement statement = conn.createStatement();
|
||||
statement.executeQuery(dataValueModel.getSql());
|
||||
atomicInteger.addAndGet(batch.length);
|
||||
} catch (SQLException e) {
|
||||
try {
|
||||
//回滚事务
|
||||
conn.rollback();
|
||||
} catch (SQLException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
executorService1.shutdown();
|
||||
executorService1.awaitTermination(1, TimeUnit.MINUTES);
|
||||
|
||||
conn.commit();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
mysqlPool.closeConn();
|
||||
mysqlPool.closeBaseConn();
|
||||
}
|
||||
|
||||
return atomicInteger.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* 格式化数据
|
||||
* @param type 数据类型
|
||||
* @param value 值
|
||||
* @return 格式化后的值
|
||||
*/
|
||||
private String formatValue(DataType type, Object value) {
|
||||
if (type == DataType.VARCHAR || type == DataType.TEXT) {
|
||||
return "'" + value.toString().replace("'", "''") + "'";
|
||||
} else if (type == DataType.BIGINT) {
|
||||
return value.toString();
|
||||
} else if (type == DataType.INT) {
|
||||
return value.toString();
|
||||
} else if (type == DataType.DECIMAL) {
|
||||
return value.toString();
|
||||
} else if (type == DataType.DATETIME) {
|
||||
return "'" + new java.sql.Date(((java.util.Date) value).getTime()) + "'";
|
||||
} else if (type == DataType.DOUBLE) {
|
||||
return value.toString();
|
||||
} else {
|
||||
// 其他类型的处理
|
||||
return "'" + value.toString() + "'";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将数据拆分成多个小批量
|
||||
* @param dataValues 数据
|
||||
* @param batchSize 每小批量的大小
|
||||
* @return 小批量列表
|
||||
*/
|
||||
private List<DataValue[][]> splitData(DataValueModel[][] dataValues, int batchSize) {
|
||||
List<DataValue[][]> batches = new ArrayList<>();
|
||||
int totalSize = dataValues.length;
|
||||
int numBatches = (int) Math.ceil((double) totalSize / batchSize);
|
||||
for (int i = 0; i < numBatches; i++) {
|
||||
int start = i * batchSize;
|
||||
int end = Math.min(start + batchSize, totalSize);
|
||||
DataValue[][] batch = new DataValue[end - start][];
|
||||
System.arraycopy(dataValues, start, batch, 0, end - start);
|
||||
batches.add(batch);
|
||||
}
|
||||
return batches;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue