测试代码
parent
a633e1c0a9
commit
ed61bb6973
|
@ -0,0 +1,33 @@
|
||||||
|
package com.muyu.common.pool;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author DongZl
|
||||||
|
* @description: 可控最大并发数线程池
|
||||||
|
* @Date 2023-12-5 下午 01:51
|
||||||
|
*/
|
||||||
|
public class FixedThreadPool {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 可重用固定个数的线程池
|
||||||
|
*/
|
||||||
|
private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 线程池提交任务
|
||||||
|
* @param runnable 线程
|
||||||
|
*/
|
||||||
|
public static void submit(Runnable runnable){
|
||||||
|
fixedThreadPool.submit(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭线程池
|
||||||
|
*/
|
||||||
|
public static void shutDown(){
|
||||||
|
fixedThreadPool.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.common;
|
package com.muyu.common.pool;
|
||||||
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
* @description: 线程池
|
* @description: 线程池
|
||||||
* @Date 2023-11-17 上午 09:16
|
* @Date 2023-11-17 上午 09:16
|
||||||
*/
|
*/
|
||||||
public class ThreadPool {
|
public class ScheduledThreadPool {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 周期性线程池 CPU 数量 * 2 + 1
|
* 周期性线程池 CPU 数量 * 2 + 1
|
|
@ -31,7 +31,7 @@ public class TaskModel {
|
||||||
* 任务是否可以执行
|
* 任务是否可以执行
|
||||||
* @return 是否有任务执行
|
* @return 是否有任务执行
|
||||||
*/
|
*/
|
||||||
public synchronized boolean isExecution(){
|
public boolean isExecution(){
|
||||||
// 为true表示任务在执行
|
// 为true表示任务在执行
|
||||||
if (unifiedStatus.get()){
|
if (unifiedStatus.get()){
|
||||||
// 就算状态为true若执行线程为null或者线程为不活跃也可以执行也可以执行任务
|
// 就算状态为true若执行线程为null或者线程为不活跃也可以执行也可以执行任务
|
||||||
|
@ -51,7 +51,7 @@ public class TaskModel {
|
||||||
/**
|
/**
|
||||||
* 任务执行总数
|
* 任务执行总数
|
||||||
*/
|
*/
|
||||||
private Integer taskExecutionSum;
|
private Integer taskExecutionSum = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务执行时间
|
* 任务执行时间
|
||||||
|
@ -64,7 +64,7 @@ public class TaskModel {
|
||||||
* @param taskName 任务名称
|
* @param taskName 任务名称
|
||||||
* @param taskExecutionSum 任务总数
|
* @param taskExecutionSum 任务总数
|
||||||
*/
|
*/
|
||||||
public synchronized void submit(String taskName,Integer taskExecutionSum, Thread task){
|
public void submit(String taskName,Integer taskExecutionSum, Thread task){
|
||||||
if (!this.isExecution()){
|
if (!this.isExecution()){
|
||||||
throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务");
|
throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务");
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ public class TaskModel {
|
||||||
/**
|
/**
|
||||||
* 任务成功执行总数
|
* 任务成功执行总数
|
||||||
*/
|
*/
|
||||||
private AtomicInteger taskSuccessSum;
|
private AtomicInteger taskSuccessSum = new AtomicInteger();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 累计成功
|
* 累计成功
|
||||||
|
@ -113,7 +113,7 @@ public class TaskModel {
|
||||||
/**
|
/**
|
||||||
* 任务执行失败总数
|
* 任务执行失败总数
|
||||||
*/
|
*/
|
||||||
private AtomicInteger taskErrorSum;
|
private AtomicInteger taskErrorSum = new AtomicInteger();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 累计失败
|
* 累计失败
|
||||||
|
|
|
@ -85,6 +85,7 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
|
||||||
@Override
|
@Override
|
||||||
public void syncDb () {
|
public void syncDb () {
|
||||||
try {
|
try {
|
||||||
|
// vehicleInstanceService.isTaskStatus();
|
||||||
log.info("同步数据库开始");
|
log.info("同步数据库开始");
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
Collection<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
|
Collection<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
|
||||||
|
|
|
@ -3,6 +3,7 @@ package com.muyu.service.impl;
|
||||||
import com.alibaba.fastjson2.JSONArray;
|
import com.alibaba.fastjson2.JSONArray;
|
||||||
import com.muyu.common.PageList;
|
import com.muyu.common.PageList;
|
||||||
import com.muyu.common.Result;
|
import com.muyu.common.Result;
|
||||||
|
import com.muyu.common.pool.FixedThreadPool;
|
||||||
import com.muyu.domain.PositionRouteInfo;
|
import com.muyu.domain.PositionRouteInfo;
|
||||||
import com.muyu.domain.Vehicle;
|
import com.muyu.domain.Vehicle;
|
||||||
import com.muyu.domain.model.PositionModel;
|
import com.muyu.domain.model.PositionModel;
|
||||||
|
@ -33,7 +34,8 @@ import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -108,6 +110,7 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void vehicleClientInit (String vin) {
|
public void vehicleClientInit (String vin) {
|
||||||
|
log.info("vin[{}],开始上线", vin);
|
||||||
VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin);
|
VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin);
|
||||||
if (vehicleInstance == null){
|
if (vehicleInstance == null){
|
||||||
throw new RuntimeException("没有【"+vin+"】车辆");
|
throw new RuntimeException("没有【"+vin+"】车辆");
|
||||||
|
@ -129,10 +132,12 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
|
||||||
.topic(result.getData())
|
.topic(result.getData())
|
||||||
.clientId(vin)
|
.clientId(vin)
|
||||||
.username(connectionReq.getUserName())
|
.username(connectionReq.getUserName())
|
||||||
.password(vin+connectionReq.getTimestamp()+connectionReq.getNonce())
|
.password(vin + connectionReq.getTimestamp() + connectionReq.getNonce())
|
||||||
.build();
|
.build();
|
||||||
vehicleInstance.setMqttProperties(mqttProperties);
|
vehicleInstance.setMqttProperties(mqttProperties);
|
||||||
vehicleInstance.initClient();
|
vehicleInstance.initClient();
|
||||||
|
|
||||||
|
log.info("vin[{}],上线成功", vin);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -219,30 +224,39 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
|
||||||
@Override
|
@Override
|
||||||
public void unifiedOnline () {
|
public void unifiedOnline () {
|
||||||
// 获取离线车辆
|
// 获取离线车辆
|
||||||
List<VehicleInstance> offlineVehicleInstanceList
|
List<String> vinList
|
||||||
= LocalContainer.getOfflineVehicleInstance();
|
= LocalContainer.getOfflineVehicleInstance().stream().map(VehicleInstance::getVin).toList();
|
||||||
Thread taskThread = new Thread(() -> {
|
CountDownLatch countDownLatch = new CountDownLatch(vinList.size());
|
||||||
try {
|
|
||||||
// 筛选出离线车辆并使用并行流进行上线操作
|
// 筛选出离线车辆并使用并行流进行上线操作
|
||||||
offlineVehicleInstanceList
|
for (String vin : vinList) {
|
||||||
.stream()
|
FixedThreadPool.submit(
|
||||||
.parallel()
|
new Thread(() -> {
|
||||||
.map(VehicleInstance::getVin)
|
|
||||||
.forEach(vin -> {
|
|
||||||
try {
|
try {
|
||||||
this.vehicleClientInit(vin);
|
this.vehicleClientInit(vin);
|
||||||
taskModel.incrementSuccess();
|
taskModel.incrementSuccess();
|
||||||
}catch (Exception exception){
|
} catch (Exception exception) {
|
||||||
|
exception.printStackTrace();
|
||||||
log.error("车辆上线异常:{}", exception.getMessage());
|
log.error("车辆上线异常:{}", exception.getMessage());
|
||||||
taskModel.incrementError();
|
taskModel.incrementError();
|
||||||
}
|
}
|
||||||
});
|
countDownLatch.countDown();
|
||||||
} catch (Exception exception) {
|
})
|
||||||
log.error("车辆一件上线报错:{}", exception.getMessage(), exception);
|
);
|
||||||
}
|
}
|
||||||
|
new Thread(() -> {
|
||||||
|
boolean await = false;
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
await = countDownLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("等待异常:{}", e.getMessage(), e);
|
||||||
|
await = true;
|
||||||
|
}
|
||||||
|
} while (!await);
|
||||||
taskModel.down();
|
taskModel.down();
|
||||||
});
|
}, "一键上线").start();
|
||||||
taskModel.submit("一键上线", offlineVehicleInstanceList.size(), taskThread);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -2,7 +2,7 @@ package com.muyu.vehicle;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.muyu.common.SystemConstant;
|
import com.muyu.common.SystemConstant;
|
||||||
import com.muyu.common.ThreadPool;
|
import com.muyu.common.pool.ScheduledThreadPool;
|
||||||
import com.muyu.domain.Vehicle;
|
import com.muyu.domain.Vehicle;
|
||||||
import com.muyu.domain.model.PositionModel;
|
import com.muyu.domain.model.PositionModel;
|
||||||
import com.muyu.utils.CalculateCheckDigit;
|
import com.muyu.utils.CalculateCheckDigit;
|
||||||
|
@ -195,7 +195,7 @@ public class VehicleInstance {
|
||||||
VehicleThread vehicleThread = new VehicleThread();
|
VehicleThread vehicleThread = new VehicleThread();
|
||||||
vehicleThread.setVehicleInstance(this);
|
vehicleThread.setVehicleInstance(this);
|
||||||
this.setVehicleThread(vehicleThread);
|
this.setVehicleThread(vehicleThread);
|
||||||
ScheduledFuture<?> scheduledFuture = ThreadPool.submit(vehicleThread);
|
ScheduledFuture<?> scheduledFuture = ScheduledThreadPool.submit(vehicleThread);
|
||||||
this.setScheduledFuture(scheduledFuture);
|
this.setScheduledFuture(scheduledFuture);
|
||||||
log.info("初始化车辆上报模拟线程开始:[{}]", this.getVin());
|
log.info("初始化车辆上报模拟线程开始:[{}]", this.getVin());
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,12 @@
|
||||||
package com.muyu.vehicle.core;
|
package com.muyu.vehicle.core;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||||
import com.muyu.common.ThreadPool;
|
import com.muyu.common.pool.FixedThreadPool;
|
||||||
|
import com.muyu.common.pool.ScheduledThreadPool;
|
||||||
import com.muyu.domain.Vehicle;
|
import com.muyu.domain.Vehicle;
|
||||||
import com.muyu.service.VehicleInstanceService;
|
import com.muyu.service.VehicleInstanceService;
|
||||||
import com.muyu.service.VehicleService;
|
import com.muyu.service.VehicleService;
|
||||||
import com.muyu.vehicle.VehicleInstance;
|
import com.muyu.vehicle.VehicleInstance;
|
||||||
import com.muyu.vehicle.model.VehicleData;
|
|
||||||
import com.muyu.vehicle.model.properties.MqttProperties;
|
|
||||||
import com.muyu.vehicle.thread.VehicleThread;
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
@ -16,8 +14,6 @@ import org.springframework.boot.ApplicationRunner;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -58,7 +54,7 @@ public class VehicleConfiguration implements ApplicationRunner {
|
||||||
public void run (ApplicationArguments args) {
|
public void run (ApplicationArguments args) {
|
||||||
this.vehiclePageInit();
|
this.vehiclePageInit();
|
||||||
// 提交给线程池 一分钟 执行一次
|
// 提交给线程池 一分钟 执行一次
|
||||||
ThreadPool.submit(new Thread(vehicleService::syncDb), 30);
|
// ThreadPool.submit(new Thread(vehicleService::syncDb), 30);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,6 +72,8 @@ public class VehicleConfiguration implements ApplicationRunner {
|
||||||
onlineVehicleInstanceList.forEach(VehicleInstance::closeClient);
|
onlineVehicleInstanceList.forEach(VehicleInstance::closeClient);
|
||||||
|
|
||||||
log.info("关闭线程池");
|
log.info("关闭线程池");
|
||||||
ThreadPool.shutdown();
|
ScheduledThreadPool.shutdown();
|
||||||
|
FixedThreadPool.shutDown();
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,9 +59,9 @@ mybatis-plus:
|
||||||
logging:
|
logging:
|
||||||
level:
|
level:
|
||||||
com.muyu: DEBUG
|
com.muyu: DEBUG
|
||||||
com.muyu.service: INFO
|
com.muyu.service: DEBUG
|
||||||
com.muyu.mapper: INFO
|
com.muyu.mapper: DEBUG
|
||||||
com.muyu.vehicle: INFO
|
com.muyu.vehicle: DEBUG
|
||||||
root: INFO
|
root: INFO
|
||||||
org:
|
org:
|
||||||
springframework:
|
springframework:
|
||||||
|
|
Loading…
Reference in New Issue