一键上线
parent
eb1937a27b
commit
5c2fbb91dd
|
@ -1,3 +1,4 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<project version="4">
|
<project version="4">
|
||||||
<component name="ExternalStorageConfigurationManager" enabled="true" />
|
<component name="ExternalStorageConfigurationManager" enabled="true" />
|
||||||
<component name="MavenProjectsManager">
|
<component name="MavenProjectsManager">
|
||||||
|
@ -7,7 +8,7 @@
|
||||||
</list>
|
</list>
|
||||||
</option>
|
</option>
|
||||||
</component>
|
</component>
|
||||||
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
|
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" project-jdk-name="17" project-jdk-type="JavaSDK">
|
||||||
<output url="file://$PROJECT_DIR$/out" />
|
<output url="file://$PROJECT_DIR$/out" />
|
||||||
</component>
|
</component>
|
||||||
</project>
|
</project>
|
|
@ -14,15 +14,15 @@ public class FixedThreadPool {
|
||||||
/**
|
/**
|
||||||
* 可重用固定个数的线程池
|
* 可重用固定个数的线程池
|
||||||
*/
|
*/
|
||||||
private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
|
private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(15);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 线程池提交任务
|
* 线程池提交任务
|
||||||
* @param runnable 线程
|
* @param thread 线程
|
||||||
*/
|
*/
|
||||||
public static Future<?> submit(Runnable runnable){
|
public static Future<?> submit(Thread thread){
|
||||||
return fixedThreadPool.submit(runnable);
|
return fixedThreadPool.submit(thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -15,7 +15,8 @@ public class ScheduledThreadPool {
|
||||||
/**
|
/**
|
||||||
* 周期性线程池 CPU 数量 * 2 + 1
|
* 周期性线程池 CPU 数量 * 2 + 1
|
||||||
*/
|
*/
|
||||||
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 + 1);
|
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(
|
||||||
|
Runtime.getRuntime().availableProcessors() * 2 + 1);
|
||||||
|
|
||||||
public static ScheduledFuture<?> submit (Runnable thread){
|
public static ScheduledFuture<?> submit (Runnable thread){
|
||||||
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
|
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package com.muyu.service.impl;
|
package com.muyu.service.impl;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONArray;
|
import com.alibaba.fastjson2.JSONArray;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.muyu.common.pool.FixedThreadPool;
|
import com.muyu.common.pool.FixedThreadPool;
|
||||||
import com.muyu.domain.PositionRouteInfo;
|
import com.muyu.domain.PositionRouteInfo;
|
||||||
import com.muyu.domain.model.PositionModel;
|
import com.muyu.domain.model.PositionModel;
|
||||||
|
@ -16,9 +17,10 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,62 +50,51 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void unifiedOnline () {
|
public void unifiedOnline () {
|
||||||
// 获取离线车辆
|
// 获取离线车辆VIN
|
||||||
List<String> vinList
|
List<String> vinList = LocalContainer.getOfflineVehicleInstance()
|
||||||
= LocalContainer.getOfflineVehicleInstance().stream().map(VehicleInstance::getVin).toList();
|
.stream()
|
||||||
|
.map(VehicleInstance::getVin)
|
||||||
// 获取执行多少次 执行大小 执行页码
|
.toList();
|
||||||
int vinSize = vinList.size(), executionSize = 10;
|
|
||||||
// 总执行次数
|
|
||||||
int executionSum = vinSize/executionSize + vinSize % executionSize == 0 ? 0 : 1;
|
|
||||||
|
|
||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
// 分页进行业务操作
|
int vinSize = 0, executionSize = 15;
|
||||||
for (int page = 0; page < executionSum; page++) {
|
do {
|
||||||
// 是否继续等待
|
int startIndex = vinSize++ * executionSize;
|
||||||
boolean await;
|
|
||||||
// 等待次数/最大等待次数
|
|
||||||
int waitSize = 0, waitMaxSize = 3;
|
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(vinList.size());
|
|
||||||
// 进行分页开启车辆
|
// 进行分页开启车辆
|
||||||
List<String> executionVinList = vinList.stream()
|
List<String> executionVinList = vinList.stream()
|
||||||
.limit(page * executionSize)
|
.skip(startIndex)
|
||||||
.skip(executionSize)
|
.limit(executionSize)
|
||||||
.toList();
|
.toList();
|
||||||
do {
|
CountDownLatch countDownLatch = new CountDownLatch(executionVinList.size());
|
||||||
try {
|
Map<String, Thread> startVehicleThread = new ConcurrentHashMap<>();
|
||||||
await = countDownLatch.await(5, TimeUnit.SECONDS);
|
executionVinList.forEach(vin -> {
|
||||||
|
Thread thread = new Thread(() -> {
|
||||||
log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount());
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("等待异常:{}", e.getMessage(), e);
|
|
||||||
await = true;
|
|
||||||
}
|
|
||||||
// 等待countdown或者等待轮训超过三次结束
|
|
||||||
} while (!await || waitSize++ < waitMaxSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
taskModel.down();
|
|
||||||
}, "一键上线").start();
|
|
||||||
|
|
||||||
|
|
||||||
// 筛选出离线车辆并使用并行流进行上线操作
|
|
||||||
/*for (String vin : vinList) {
|
|
||||||
Future<?> submitFuture = FixedThreadPool.submit(
|
|
||||||
new Thread(() -> {
|
|
||||||
try {
|
try {
|
||||||
vehicleInstanceService.vehicleClientInit(vin);
|
vehicleInstanceService.vehicleClientInit(vin);
|
||||||
taskModel.incrementSuccess();
|
startVehicleThread.remove(vin);
|
||||||
} catch (Exception exception) {
|
}catch (Exception interruptedException){
|
||||||
log.error("车辆上线异常:{}", exception.getMessage(), exception);
|
log.error(interruptedException);
|
||||||
taskModel.incrementError();
|
|
||||||
}
|
}
|
||||||
countDownLatch.countDown();
|
});
|
||||||
})
|
startVehicleThread.put(vin, thread);
|
||||||
);
|
FixedThreadPool.submit(thread);
|
||||||
}*/
|
});
|
||||||
|
|
||||||
|
|
||||||
|
try {
|
||||||
|
boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
log.info(
|
||||||
|
"开始:[{}], 结束:[{}],未上线成功:[{}], vin:[{}]",
|
||||||
|
startIndex, startIndex+executionVinList.size(),
|
||||||
|
startVehicleThread.size(),
|
||||||
|
JSONObject.toJSONString(executionVinList));
|
||||||
|
if (!await){
|
||||||
|
startVehicleThread.values().forEach(Thread::interrupt);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ignored) {}
|
||||||
|
if (executionVinList.size() < executionSize){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}while (true);
|
||||||
|
}).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -112,7 +103,7 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
|
||||||
@Override
|
@Override
|
||||||
public void unifiedOffline () {
|
public void unifiedOffline () {
|
||||||
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
|
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
|
||||||
Thread taskThread = new Thread(() -> {
|
new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
|
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
|
||||||
onlineVehicleInstanceList
|
onlineVehicleInstanceList
|
||||||
|
|
Loading…
Reference in New Issue