From 5c2fbb91ddb5788cb2f44e4aa204a1d3078aa87b Mon Sep 17 00:00:00 2001 From: DongZeLiang <2746733890@qq.com> Date: Tue, 26 Dec 2023 16:37:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=80=E9=94=AE=E4=B8=8A=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/misc.xml | 3 +- .../com/muyu/common/pool/FixedThreadPool.java | 8 +- .../muyu/common/pool/ScheduledThreadPool.java | 3 +- .../impl/VehicleUnifiedServiceImpl.java | 91 +++++++++---------- 4 files changed, 49 insertions(+), 56 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index c3f3b0a..4731638 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,3 +1,4 @@ + @@ -7,7 +8,7 @@ - + \ No newline at end of file diff --git a/src/main/java/com/muyu/common/pool/FixedThreadPool.java b/src/main/java/com/muyu/common/pool/FixedThreadPool.java index ad1c532..acef62f 100644 --- a/src/main/java/com/muyu/common/pool/FixedThreadPool.java +++ b/src/main/java/com/muyu/common/pool/FixedThreadPool.java @@ -14,15 +14,15 @@ public class FixedThreadPool { /** * 可重用固定个数的线程池 */ - private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1); + private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(15); /** * 线程池提交任务 - * @param runnable 线程 + * @param thread 线程 */ - public static Future submit(Runnable runnable){ - return fixedThreadPool.submit(runnable); + public static Future submit(Thread thread){ + return fixedThreadPool.submit(thread); } /** diff --git a/src/main/java/com/muyu/common/pool/ScheduledThreadPool.java b/src/main/java/com/muyu/common/pool/ScheduledThreadPool.java index 887adf8..ab4e674 100644 --- a/src/main/java/com/muyu/common/pool/ScheduledThreadPool.java +++ b/src/main/java/com/muyu/common/pool/ScheduledThreadPool.java @@ -15,7 +15,8 @@ public class ScheduledThreadPool { /** * 周期性线程池 CPU 数量 * 2 + 1 */ - private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 + 1); + private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors() * 2 + 1); public static ScheduledFuture submit (Runnable thread){ // 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位 diff --git a/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java b/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java index 379dab2..b61f9c3 100644 --- a/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java +++ b/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java @@ -1,6 +1,7 @@ package com.muyu.service.impl; import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.muyu.common.pool.FixedThreadPool; import com.muyu.domain.PositionRouteInfo; import com.muyu.domain.model.PositionModel; @@ -16,9 +17,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -48,62 +50,51 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService { */ @Override public void unifiedOnline () { - // 获取离线车辆 - List vinList - = LocalContainer.getOfflineVehicleInstance().stream().map(VehicleInstance::getVin).toList(); - - // 获取执行多少次 执行大小 执行页码 - int vinSize = vinList.size(), executionSize = 10; - // 总执行次数 - int executionSum = vinSize/executionSize + vinSize % executionSize == 0 ? 0 : 1; - + // 获取离线车辆VIN + List vinList = LocalContainer.getOfflineVehicleInstance() + .stream() + .map(VehicleInstance::getVin) + .toList(); new Thread(() -> { - // 分页进行业务操作 - for (int page = 0; page < executionSum; page++) { - // 是否继续等待 - boolean await; - // 等待次数/最大等待次数 - int waitSize = 0, waitMaxSize = 3; - CountDownLatch countDownLatch = new CountDownLatch(vinList.size()); + int vinSize = 0, executionSize = 15; + do { + int startIndex = vinSize++ * executionSize; // 进行分页开启车辆 List executionVinList = vinList.stream() - .limit(page * executionSize) - .skip(executionSize) + .skip(startIndex) + .limit(executionSize) .toList(); - do { - try { - await = countDownLatch.await(5, TimeUnit.SECONDS); - - log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount()); - } catch (InterruptedException e) { - log.error("等待异常:{}", e.getMessage(), e); - await = true; - } - // 等待countdown或者等待轮训超过三次结束 - } while (!await || waitSize++ < waitMaxSize); - } - - taskModel.down(); - }, "一键上线").start(); - - - // 筛选出离线车辆并使用并行流进行上线操作 - /*for (String vin : vinList) { - Future submitFuture = FixedThreadPool.submit( - new Thread(() -> { + CountDownLatch countDownLatch = new CountDownLatch(executionVinList.size()); + Map startVehicleThread = new ConcurrentHashMap<>(); + executionVinList.forEach(vin -> { + Thread thread = new Thread(() -> { try { vehicleInstanceService.vehicleClientInit(vin); - taskModel.incrementSuccess(); - } catch (Exception exception) { - log.error("车辆上线异常:{}", exception.getMessage(), exception); - taskModel.incrementError(); + startVehicleThread.remove(vin); + }catch (Exception interruptedException){ + log.error(interruptedException); } - countDownLatch.countDown(); - }) - ); - }*/ - + }); + startVehicleThread.put(vin, thread); + FixedThreadPool.submit(thread); + }); + try { + boolean await = countDownLatch.await(5, TimeUnit.SECONDS); + log.info( + "开始:[{}], 结束:[{}],未上线成功:[{}], vin:[{}]", + startIndex, startIndex+executionVinList.size(), + startVehicleThread.size(), + JSONObject.toJSONString(executionVinList)); + if (!await){ + startVehicleThread.values().forEach(Thread::interrupt); + } + } catch (InterruptedException ignored) {} + if (executionVinList.size() < executionSize){ + break; + } + }while (true); + }).start(); } /** @@ -112,7 +103,7 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService { @Override public void unifiedOffline () { List onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance(); - Thread taskThread = new Thread(() -> { + new Thread(() -> { try { // 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作 onlineVehicleInstanceList