添加线程

master
Cui YongXing 2024-09-03 22:20:16 +08:00
parent ef13f28087
commit 312663aff2
5 changed files with 228 additions and 5 deletions

View File

@ -0,0 +1,16 @@
package com.muyu.common.domian.enums;
public enum Weight{
urgency("3"),high("0"),centre("1"),low("2");
private String value;
Weight(String value){
this.value = value;
}
public String getValue(){
return value;
}
}

View File

@ -25,4 +25,6 @@ public interface DatasourceFeign {
@PostMapping("/product/addProduct")
public Result addProduct(@RequestParam("basicId") Long basicId, @RequestParam("tableId") Long tableId, @RequestBody List<List<DataValue>> listList);
@PostMapping("/dataValue/findCount")
public Result<Long> findCount(@RequestParam("basicId") Long basicId,@RequestParam("sql") String sql);
}

View File

@ -26,6 +26,12 @@ public class DatasourceFeignFactory implements FallbackFactory<DatasourceFeign>
log.info(e);
return Result.error("网络开小差......");
}
@Override
public Result<Long> findCount(Long basicId, String sql) {
log.info(e);
return Result.error("网络开小差......");
}
};
}
}

View File

@ -11,6 +11,7 @@ import com.muyu.common.domian.NodeJoint;
import com.muyu.common.domian.TaskInfo;
import com.muyu.common.domian.TaskInput;
import com.muyu.common.domian.TaskOutput;
import com.muyu.common.domian.enums.Weight;
import com.muyu.common.domian.req.TaskInfoListReq;
import com.muyu.common.domian.resp.TaskInfoResp;
import com.muyu.remote.feign.DatasourceFeign;
@ -20,6 +21,7 @@ import com.muyu.task.server.service.TaskInfoService;
import com.muyu.task.server.service.TaskInputService;
import com.muyu.task.server.service.TaskOutputService;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -30,6 +32,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import static com.muyu.task.server.thread.OptimizedPrioritizedThreadPool.submitHighPriorityTask;
/**
* @author Administrator
*/
@ -153,6 +157,8 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
@Override
public String findByFieName2(Long taskId) {
TaskInfo taskInfo = taskInfoMapper.selectById(taskId);
String weight = taskInfo.getWeight();
QueryWrapper<TaskInput> wrapper = new QueryWrapper<>();
wrapper.eq("task_id", taskId);
HashSet<Long> longs = new HashSet<>();
@ -170,8 +176,8 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
String[] tableAsFieId = list.get(i).getTableAsFieId().split(",");
String[] fieIdAsEngineId = list.get(i).getFieIdAsEngineId().split(",");
for (int j = 0; j < tableFieId.length; j++) {
hashMap.put(tableFieId[j],fieIdAsEngineId[j]);
tableNameMap.put(tableFieId[j],tableAsFieId[j]);
hashMap.put(tableFieId[j], fieIdAsEngineId[j]);
tableNameMap.put(tableFieId[j], tableAsFieId[j]);
}
}
@ -203,11 +209,11 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
+ " = " + nodeJoint.getTwoFie();
}
}else {
} else {
TaskInput taskInput = taskInputService.getOne(wrapper);
String tableName = taskInput.getTableName();
String tableAsName = taskInput.getTableAsName();
joint=" "+ tableName+" "+tableAsName;
joint = " " + tableName + " " + tableAsName;
}
Long basicId = taskInputService.selectByBasicId(taskId);
@ -225,7 +231,31 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
fieName += "," + tableNameMap.get(lastFieName[i]);
}
fieName = fieName.substring(1);
String sql = " SELECT " + fieName + " FROM "+ joint;
String sql = " SELECT count(1) FROM " + joint;
Result<Long> count = datasourceFeign.findCount(basicId, sql);
System.out.println("======="+count+"=======");
if (Weight.high.equals(weight)){
submitHighPriorityTask(()->{
});
}
if (Weight.centre.equals(weight)){
}
if (Weight.low.equals(weight)){
}
if (Weight.urgency.equals(weight)){
}
//return getString(fieName,joint,basicId,newBasicId,tableId,map);
return null;
}
@NotNull
private String getString(String fieName,String joint,Long basicId,Long newBasicId,Long tableId,HashMap<String, String> map ) {
String sql = " SELECT " + fieName + " FROM " + joint;
System.out.println(sql);
Result<List<List<DataValue>>> tableValueResult = datasourceFeign.findTableValue(basicId, sql);
List<List<DataValue>> tableValue = tableValueResult.getData();

View File

@ -0,0 +1,169 @@
package com.muyu.task.server.thread;
import java.util.concurrent.*;
public class OptimizedPrioritizedThreadPool {
private static final ExecutorService executor;
private static final Semaphore highPrioritySemaphore;
private static final Semaphore mediumPrioritySemaphore;
private static final Semaphore lowPrioritySemaphore;
private static final Semaphore emergencySemaphore;
private static final int totalThreads = 30; // 固定线程池大小
private static final int defaultHighThreads = 16;
private static final int defaultMediumThreads = 9;
private static final int defaultLowThreads = 5;
private static final int emergencyThreads = 14;
private static final int emergencyHighThreads = 9;
private static final int emergencyMediumThreads = 5;
private static final int emergencyLowThreads = 2;
private static volatile boolean inEmergencyMode = false;
static {
if (defaultHighThreads + defaultMediumThreads + defaultLowThreads > totalThreads) {
throw new IllegalArgumentException("Default priority threads exceed total threads.");
}
if (emergencyThreads + emergencyHighThreads + emergencyMediumThreads + emergencyLowThreads > totalThreads) {
throw new IllegalArgumentException("Emergency priority threads exceed total threads.");
}
// 创建固定大小的线程池
executor = new ThreadPoolExecutor(
totalThreads, totalThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>() // 使用无界的阻塞队列
);
highPrioritySemaphore = new Semaphore(defaultHighThreads);
mediumPrioritySemaphore = new Semaphore(defaultMediumThreads);
lowPrioritySemaphore = new Semaphore(defaultLowThreads);
emergencySemaphore = new Semaphore(0);
}
public static void submitEmergencyTask(Runnable task) {
// 调整线程分配以适应紧急任务
adjustForEmergency();
try {
emergencySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
emergencySemaphore.release();
// 任务完成后恢复默认线程分配
restoreDefaults();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute emergency task.");
}
}
public static void submitHighPriorityTask(Runnable task) {
try {
highPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
highPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute high priority task.");
}
}
public static void submitMediumPriorityTask(Runnable task) {
try {
mediumPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
mediumPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute medium priority task.");
}
}
public static void submitLowPriorityTask(Runnable task) {
try {
lowPrioritySemaphore.acquire();
executor.submit(() -> {
try {
task.run();
} finally {
lowPrioritySemaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 日志记录中断信息
System.err.println("Interrupted while waiting to execute low priority task.");
}
}
private static synchronized void adjustForEmergency() {
// 仅在未处于紧急模式下调整线程分配
if (!inEmergencyMode) {
inEmergencyMode = true;
// 释放一些高、中、低优先级的线程,以便为紧急任务腾出空间
highPrioritySemaphore.release(defaultHighThreads - emergencyHighThreads);
mediumPrioritySemaphore.release(defaultMediumThreads - emergencyMediumThreads);
lowPrioritySemaphore.release(defaultLowThreads - emergencyLowThreads);
// 为紧急任务分配线程
emergencySemaphore.release(emergencyThreads);
}
}
private static synchronized void restoreDefaults() {
// 检查是否处于紧急模式
if (inEmergencyMode) {
// 清空紧急任务的许可
emergencySemaphore.drainPermits();
// 重新分配高、中、低优先级的线程
try {
highPrioritySemaphore.acquire(emergencyHighThreads);
mediumPrioritySemaphore.acquire(emergencyMediumThreads);
lowPrioritySemaphore.acquire(emergencyLowThreads);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
highPrioritySemaphore.release(defaultHighThreads);
mediumPrioritySemaphore.release(defaultMediumThreads);
lowPrioritySemaphore.release(defaultLowThreads);
// 清空紧急任务的许可
emergencySemaphore.release(-emergencyThreads);
// 退出紧急模式
inEmergencyMode = false;
}
}
public static void shutdown() throws InterruptedException {
executor.shutdown();
if (!executor.awaitTermination(1, TimeUnit.HOURS)) {
executor.shutdownNow();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
System.err.println("Pool did not terminate");
}
}
}
}