diff --git a/src/main/java/com/muyu/common/pool/FixedThreadPool.java b/src/main/java/com/muyu/common/pool/FixedThreadPool.java new file mode 100644 index 0000000..12a3def --- /dev/null +++ b/src/main/java/com/muyu/common/pool/FixedThreadPool.java @@ -0,0 +1,33 @@ +package com.muyu.common.pool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author DongZl + * @description: 可控最大并发数线程池 + * @Date 2023-12-5 下午 01:51 + */ +public class FixedThreadPool { + + /** + * 可重用固定个数的线程池 + */ + private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1); + + + /** + * 线程池提交任务 + * @param runnable 线程 + */ + public static void submit(Runnable runnable){ + fixedThreadPool.submit(runnable); + } + + /** + * 关闭线程池 + */ + public static void shutDown(){ + fixedThreadPool.shutdown(); + } +} diff --git a/src/main/java/com/muyu/common/ThreadPool.java b/src/main/java/com/muyu/common/pool/ScheduledThreadPool.java similarity index 94% rename from src/main/java/com/muyu/common/ThreadPool.java rename to src/main/java/com/muyu/common/pool/ScheduledThreadPool.java index 4c2796b..887adf8 100644 --- a/src/main/java/com/muyu/common/ThreadPool.java +++ b/src/main/java/com/muyu/common/pool/ScheduledThreadPool.java @@ -1,4 +1,4 @@ -package com.muyu.common; +package com.muyu.common.pool; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit; * @description: 线程池 * @Date 2023-11-17 上午 09:16 */ -public class ThreadPool { +public class ScheduledThreadPool { /** * 周期性线程池 CPU 数量 * 2 + 1 diff --git a/src/main/java/com/muyu/domain/model/TaskModel.java b/src/main/java/com/muyu/domain/model/TaskModel.java index 09e6c03..e385d12 100644 --- a/src/main/java/com/muyu/domain/model/TaskModel.java +++ b/src/main/java/com/muyu/domain/model/TaskModel.java @@ -31,7 +31,7 @@ public class TaskModel { * 任务是否可以执行 * @return 是否有任务执行 */ - public synchronized boolean isExecution(){ + public boolean isExecution(){ // 为true表示任务在执行 if (unifiedStatus.get()){ // 就算状态为true若执行线程为null或者线程为不活跃也可以执行也可以执行任务 @@ -51,7 +51,7 @@ public class TaskModel { /** * 任务执行总数 */ - private Integer taskExecutionSum; + private Integer taskExecutionSum = 0; /** * 任务执行时间 @@ -64,7 +64,7 @@ public class TaskModel { * @param taskName 任务名称 * @param taskExecutionSum 任务总数 */ - public synchronized void submit(String taskName,Integer taskExecutionSum, Thread task){ + public void submit(String taskName,Integer taskExecutionSum, Thread task){ if (!this.isExecution()){ throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务"); } @@ -95,7 +95,7 @@ public class TaskModel { /** * 任务成功执行总数 */ - private AtomicInteger taskSuccessSum; + private AtomicInteger taskSuccessSum = new AtomicInteger(); /** * 累计成功 @@ -113,7 +113,7 @@ public class TaskModel { /** * 任务执行失败总数 */ - private AtomicInteger taskErrorSum; + private AtomicInteger taskErrorSum = new AtomicInteger(); /** * 累计失败 diff --git a/src/main/java/com/muyu/service/impl/VechileServiceImpl.java b/src/main/java/com/muyu/service/impl/VechileServiceImpl.java index af09b78..d34fd71 100644 --- a/src/main/java/com/muyu/service/impl/VechileServiceImpl.java +++ b/src/main/java/com/muyu/service/impl/VechileServiceImpl.java @@ -85,6 +85,7 @@ public class VechileServiceImpl extends ServiceImpl impl @Override public void syncDb () { try { +// vehicleInstanceService.isTaskStatus(); log.info("同步数据库开始"); long startTime = System.currentTimeMillis(); Collection vehicleInstanceList = LocalContainer.getOnlineVehicleInstance(); diff --git a/src/main/java/com/muyu/service/impl/VehicleInstanceServiceImpl.java b/src/main/java/com/muyu/service/impl/VehicleInstanceServiceImpl.java index 0bae387..1ec0097 100644 --- a/src/main/java/com/muyu/service/impl/VehicleInstanceServiceImpl.java +++ b/src/main/java/com/muyu/service/impl/VehicleInstanceServiceImpl.java @@ -3,6 +3,7 @@ package com.muyu.service.impl; import com.alibaba.fastjson2.JSONArray; import com.muyu.common.PageList; import com.muyu.common.Result; +import com.muyu.common.pool.FixedThreadPool; import com.muyu.domain.PositionRouteInfo; import com.muyu.domain.Vehicle; import com.muyu.domain.model.PositionModel; @@ -33,7 +34,8 @@ import java.util.Comparator; import java.util.List; import java.util.Random; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; /** @@ -108,6 +110,7 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService { */ @Override public void vehicleClientInit (String vin) { + log.info("vin[{}],开始上线", vin); VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin); if (vehicleInstance == null){ throw new RuntimeException("没有【"+vin+"】车辆"); @@ -129,10 +132,12 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService { .topic(result.getData()) .clientId(vin) .username(connectionReq.getUserName()) - .password(vin+connectionReq.getTimestamp()+connectionReq.getNonce()) + .password(vin + connectionReq.getTimestamp() + connectionReq.getNonce()) .build(); vehicleInstance.setMqttProperties(mqttProperties); vehicleInstance.initClient(); + + log.info("vin[{}],上线成功", vin); } /** @@ -219,30 +224,39 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService { @Override public void unifiedOnline () { // 获取离线车辆 - List offlineVehicleInstanceList - = LocalContainer.getOfflineVehicleInstance(); - Thread taskThread = new Thread(() -> { - try { - // 筛选出离线车辆并使用并行流进行上线操作 - offlineVehicleInstanceList - .stream() - .parallel() - .map(VehicleInstance::getVin) - .forEach(vin -> { + List vinList + = LocalContainer.getOfflineVehicleInstance().stream().map(VehicleInstance::getVin).toList(); + CountDownLatch countDownLatch = new CountDownLatch(vinList.size()); + // 筛选出离线车辆并使用并行流进行上线操作 + for (String vin : vinList) { + FixedThreadPool.submit( + new Thread(() -> { try { this.vehicleClientInit(vin); taskModel.incrementSuccess(); - }catch (Exception exception){ + } catch (Exception exception) { + exception.printStackTrace(); log.error("车辆上线异常:{}", exception.getMessage()); taskModel.incrementError(); } - }); - } catch (Exception exception) { - log.error("车辆一件上线报错:{}", exception.getMessage(), exception); - } + countDownLatch.countDown(); + }) + ); + } + new Thread(() -> { + boolean await = false; + do { + try { + await = countDownLatch.await(5, TimeUnit.SECONDS); + + log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount()); + } catch (InterruptedException e) { + log.error("等待异常:{}", e.getMessage(), e); + await = true; + } + } while (!await); taskModel.down(); - }); - taskModel.submit("一键上线", offlineVehicleInstanceList.size(), taskThread); + }, "一键上线").start(); } /** diff --git a/src/main/java/com/muyu/vehicle/VehicleInstance.java b/src/main/java/com/muyu/vehicle/VehicleInstance.java index fb56634..40bb723 100644 --- a/src/main/java/com/muyu/vehicle/VehicleInstance.java +++ b/src/main/java/com/muyu/vehicle/VehicleInstance.java @@ -2,7 +2,7 @@ package com.muyu.vehicle; import com.alibaba.fastjson2.JSONObject; import com.muyu.common.SystemConstant; -import com.muyu.common.ThreadPool; +import com.muyu.common.pool.ScheduledThreadPool; import com.muyu.domain.Vehicle; import com.muyu.domain.model.PositionModel; import com.muyu.utils.CalculateCheckDigit; @@ -195,7 +195,7 @@ public class VehicleInstance { VehicleThread vehicleThread = new VehicleThread(); vehicleThread.setVehicleInstance(this); this.setVehicleThread(vehicleThread); - ScheduledFuture scheduledFuture = ThreadPool.submit(vehicleThread); + ScheduledFuture scheduledFuture = ScheduledThreadPool.submit(vehicleThread); this.setScheduledFuture(scheduledFuture); log.info("初始化车辆上报模拟线程开始:[{}]", this.getVin()); } diff --git a/src/main/java/com/muyu/vehicle/core/VehicleConfiguration.java b/src/main/java/com/muyu/vehicle/core/VehicleConfiguration.java index 59e042d..f565ac7 100644 --- a/src/main/java/com/muyu/vehicle/core/VehicleConfiguration.java +++ b/src/main/java/com/muyu/vehicle/core/VehicleConfiguration.java @@ -1,14 +1,12 @@ package com.muyu.vehicle.core; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.muyu.common.ThreadPool; +import com.muyu.common.pool.FixedThreadPool; +import com.muyu.common.pool.ScheduledThreadPool; import com.muyu.domain.Vehicle; import com.muyu.service.VehicleInstanceService; import com.muyu.service.VehicleService; import com.muyu.vehicle.VehicleInstance; -import com.muyu.vehicle.model.VehicleData; -import com.muyu.vehicle.model.properties.MqttProperties; -import com.muyu.vehicle.thread.VehicleThread; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.boot.ApplicationArguments; @@ -16,8 +14,6 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Configuration; import javax.annotation.PreDestroy; -import java.math.BigDecimal; -import java.util.Date; import java.util.List; /** @@ -58,7 +54,7 @@ public class VehicleConfiguration implements ApplicationRunner { public void run (ApplicationArguments args) { this.vehiclePageInit(); // 提交给线程池 一分钟 执行一次 - ThreadPool.submit(new Thread(vehicleService::syncDb), 30); +// ThreadPool.submit(new Thread(vehicleService::syncDb), 30); } /** @@ -76,6 +72,8 @@ public class VehicleConfiguration implements ApplicationRunner { onlineVehicleInstanceList.forEach(VehicleInstance::closeClient); log.info("关闭线程池"); - ThreadPool.shutdown(); + ScheduledThreadPool.shutdown(); + FixedThreadPool.shutDown(); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3d2c0be..8cb8934 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -59,9 +59,9 @@ mybatis-plus: logging: level: com.muyu: DEBUG - com.muyu.service: INFO - com.muyu.mapper: INFO - com.muyu.vehicle: INFO + com.muyu.service: DEBUG + com.muyu.mapper: DEBUG + com.muyu.vehicle: DEBUG root: INFO org: springframework: