diff --git a/src/main/java/com/muyu/common/pool/FixedThreadPool.java b/src/main/java/com/muyu/common/pool/FixedThreadPool.java index 12a3def..ad1c532 100644 --- a/src/main/java/com/muyu/common/pool/FixedThreadPool.java +++ b/src/main/java/com/muyu/common/pool/FixedThreadPool.java @@ -2,6 +2,7 @@ package com.muyu.common.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * @author DongZl @@ -20,8 +21,8 @@ public class FixedThreadPool { * 线程池提交任务 * @param runnable 线程 */ - public static void submit(Runnable runnable){ - fixedThreadPool.submit(runnable); + public static Future submit(Runnable runnable){ + return fixedThreadPool.submit(runnable); } /** diff --git a/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java b/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java index f5bb15d..379dab2 100644 --- a/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java +++ b/src/main/java/com/muyu/service/impl/VehicleUnifiedServiceImpl.java @@ -18,6 +18,7 @@ import org.springframework.stereotype.Service; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -50,10 +51,45 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService { // 获取离线车辆 List vinList = LocalContainer.getOfflineVehicleInstance().stream().map(VehicleInstance::getVin).toList(); - CountDownLatch countDownLatch = new CountDownLatch(vinList.size()); + + // 获取执行多少次 执行大小 执行页码 + int vinSize = vinList.size(), executionSize = 10; + // 总执行次数 + int executionSum = vinSize/executionSize + vinSize % executionSize == 0 ? 0 : 1; + + new Thread(() -> { + // 分页进行业务操作 + for (int page = 0; page < executionSum; page++) { + // 是否继续等待 + boolean await; + // 等待次数/最大等待次数 + int waitSize = 0, waitMaxSize = 3; + CountDownLatch countDownLatch = new CountDownLatch(vinList.size()); + // 进行分页开启车辆 + List executionVinList = vinList.stream() + .limit(page * executionSize) + .skip(executionSize) + .toList(); + do { + try { + await = countDownLatch.await(5, TimeUnit.SECONDS); + + log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount()); + } catch (InterruptedException e) { + log.error("等待异常:{}", e.getMessage(), e); + await = true; + } + // 等待countdown或者等待轮训超过三次结束 + } while (!await || waitSize++ < waitMaxSize); + } + + taskModel.down(); + }, "一键上线").start(); + + // 筛选出离线车辆并使用并行流进行上线操作 - for (String vin : vinList) { - FixedThreadPool.submit( + /*for (String vin : vinList) { + Future submitFuture = FixedThreadPool.submit( new Thread(() -> { try { vehicleInstanceService.vehicleClientInit(vin); @@ -65,21 +101,9 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService { countDownLatch.countDown(); }) ); - } - new Thread(() -> { - boolean await = false; - do { - try { - await = countDownLatch.await(5, TimeUnit.SECONDS); + }*/ + - log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount()); - } catch (InterruptedException e) { - log.error("等待异常:{}", e.getMessage(), e); - await = true; - } - } while (!await); - taskModel.down(); - }, "一键上线").start(); } /** diff --git a/src/main/java/com/muyu/vehicle/VehicleInstance.java b/src/main/java/com/muyu/vehicle/VehicleInstance.java index 2f5c857..fd68be5 100644 --- a/src/main/java/com/muyu/vehicle/VehicleInstance.java +++ b/src/main/java/com/muyu/vehicle/VehicleInstance.java @@ -89,9 +89,6 @@ public class VehicleInstance { */ private MqttProperties mqttProperties; - public VehicleInstance(MqttProperties mqttProperties) { - this.mqttProperties = mqttProperties; - } /*** * 获取当前车辆VIN