添加分页

master
Cui YongXing 2024-09-04 15:48:15 +08:00
parent fff5315cd3
commit 31c589da46
2 changed files with 150 additions and 6 deletions

View File

@ -221,6 +221,9 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
QueryWrapper<TaskOutput> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("task_id", taskId);
TaskOutput one = taskOutputService.getOne(queryWrapper);
if (one==null){
return "没有选择输出";
}
Long tableId = one.getTableId();
Long newBasicId = one.getBasicId();
HashMap<String, String> map = new HashMap<>();
@ -235,14 +238,11 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
String sql = " SELECT count(1) FROM " + joint;
Result<Long> countResult = datasourceFeign.findCount(basicId, sql);
Long data = countResult.getData();
System.out.println("======="+data+"=======");
String finalFieName = fieName;
String finalJoint = joint;
System.out.println(weight);
System.out.println(Weight.centre.getValue());
long count = data/10000==0?1:data/10000;
if (Weight.high.getValue().equals(weight)){
log.info("执行高级任务");
for (long i = 1; i <= count; i++) {
long pageNum = (i - 1) * 10000;
submitHighPriorityTask(()->{
@ -251,6 +251,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
}
}
if (Weight.centre.getValue().equals(weight)){
log.info("执行中级任务");
for (long i = 1; i <= count; i++) {
long pageNum = (i - 1) * 10000;
System.out.println(pageNum);
@ -260,6 +261,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
}
}
if (Weight.low.getValue().equals(weight)){
log.info("执行低级任务");
for (long i = 1; i <= count; i++) {
long pageNum = (i - 1) * 10000;
submitLowPriorityTask(()->{
@ -268,6 +270,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
}
}
if (Weight.urgency.getValue().equals(weight)){
log.info("执行紧急任务");
for (long i = 1; i <= count; i++) {
long pageNum = (i - 1) * 10000;
submitEmergencyTask(()->{
@ -281,8 +284,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
@NotNull
private void getString(Long pageNum,String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap<String, String> map ) {
String sql = " SELECT " + fieName + " FROM " + joint;
System.out.println(sql);
String sql = " SELECT " + fieName + " FROM " + joint +" limit "+pageNum +",10000 ";
Result<List<List<DataValue>>> tableValueResult = datasourceFeign.findTableValue(basicId, sql);
List<List<DataValue>> tableValue = tableValueResult.getData();
for (List<DataValue> dataValues : tableValue) {

View File

@ -0,0 +1,142 @@
package com.muyu.task.server.thread;
import java.util.concurrent.*;
public class PrioritizedThreadPool {
private static final ExecutorService executor;
private static final Semaphore highSemaphore;
private static final Semaphore lowSemaphore;
private static final Semaphore centerSemaphore;
private static final Semaphore emergencySemaphore;
private static final int totalThreads = 30;
private static final int highThreads = 16;
private static final int centerThreads = 9;
private static final int lowThreads = 5;
private static final int emergencyTaskThreads = 14;
private static final int emergencyHighThreads = 9;
private static final int emergencyCenterThreads = 5;
private static final int emergencyLowThreads = 2;
private static volatile boolean inEmergencyMode = false;
static {
if (highThreads + centerThreads + lowThreads > totalThreads) {
throw new IllegalArgumentException("总和大于线程数");
}
if (emergencyCenterThreads + emergencyHighThreads + emergencyLowThreads > totalThreads) {
throw new IllegalArgumentException("总和大于线程数");
}
executor =new ThreadPoolExecutor(totalThreads, totalThreads,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
highSemaphore = new Semaphore(highThreads);
centerSemaphore = new Semaphore(centerThreads);
lowSemaphore = new Semaphore(lowThreads);
emergencySemaphore = new Semaphore(0);
}
public static void submit(Runnable task) {
adjustForEmergency();
try {
emergencySemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
emergencySemaphore.release();
restoreDefaults();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static void subHighTask(Runnable task){
try {
highSemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
highSemaphore.release();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static void subcenterTask(Runnable task){
try {
centerSemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
centerSemaphore.release();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static void subLowTask(Runnable task){
try {
lowSemaphore.acquire();
executor.submit(()->{
try {
task.run();
}finally {
lowSemaphore.release();
}
});
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
private static synchronized void adjustForEmergency(){
if (!inEmergencyMode) {
inEmergencyMode = true;
highSemaphore.release(highThreads-emergencyHighThreads);
centerSemaphore.release(centerThreads-emergencyCenterThreads);
lowSemaphore.release(lowThreads-emergencyLowThreads);
emergencySemaphore.release(emergencyTaskThreads);
}
}
private static synchronized void restoreDefaults(){
if (inEmergencyMode){
emergencySemaphore.drainPermits();
try {
highSemaphore.acquire(emergencyHighThreads);
centerSemaphore.acquire(emergencyCenterThreads);
lowSemaphore.acquire(emergencyLowThreads);
}catch (InterruptedException e){
throw new RuntimeException(e);
}
}
highSemaphore.release(highThreads);
centerSemaphore.release(centerThreads);
lowSemaphore.release(lowThreads);
emergencySemaphore.release(-emergencyTaskThreads);
inEmergencyMode = false;
}
public static void shutdown() throws InterruptedException {
executor.shutdown();
if (!executor.awaitTermination(1,TimeUnit.HOURS)){
executor.shutdownNow();
if (!executor.awaitTermination(1,TimeUnit.MINUTES)){
System.err.println("Executor did not terminate");
}
}
}
}