fix():重写一键处理功能

v1.0
DongZeLiang 2024-06-11 19:24:40 +08:00
parent e1e2555424
commit 3a0310d943
3 changed files with 37 additions and 205 deletions

View File

@ -6,7 +6,9 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -25,13 +27,25 @@ public class TaskModel {
/**
* false
* false
* true
*/
private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.FALSE);
/**
*
*/
private CountDownLatch countDownLatch ;
/**
*
*/
private LinkedBlockingQueue<String> vehicleTaskQueue = new LinkedBlockingQueue<>();
/**
*
* true -
* false-
* @return
*/
public boolean isExecution(){
@ -52,19 +66,36 @@ public class TaskModel {
*/
private long taskStartTime;
/**
*
*/
private AtomicInteger taskSuccessSum = new AtomicInteger();
/**
*
*/
private AtomicInteger taskErrorSum = new AtomicInteger();
/**
* 线
* @param taskName
* @param taskExecutionSum
* @param vehicleVinList VIN
*/
public void submit(String taskName,Integer taskExecutionSum){
public void submit(String taskName, List<String> vehicleVinList){
if (!this.isExecution()){
throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务");
}
if (vehicleVinList.isEmpty()){
throw new RuntimeException("无需执行");
}
unifiedStatus.set(Boolean.TRUE);
this.taskName = taskName;
this.countDownLatch = new CountDownLatch(taskExecutionSum);
this.taskExecutionSum = taskExecutionSum;
this.countDownLatch = new CountDownLatch(vehicleVinList.size());
this.vehicleTaskQueue.addAll(vehicleVinList);
this.taskExecutionSum = vehicleVinList.size();
this.taskSuccessSum = new AtomicInteger();
this.taskErrorSum = new AtomicInteger();
this.taskStartTime = System.currentTimeMillis();
@ -83,11 +114,6 @@ public class TaskModel {
unifiedStatus.set(Boolean.FALSE);
}
/**
*
*/
private AtomicInteger taskSuccessSum = new AtomicInteger();
/**
*
*/
@ -98,14 +124,9 @@ public class TaskModel {
*
*/
public int getSuccessSum(){
return this.taskSuccessSum.incrementAndGet();
return this.taskSuccessSum.get();
}
/**
*
*/
private AtomicInteger taskErrorSum = new AtomicInteger();
/**
*
*/
@ -116,6 +137,6 @@ public class TaskModel {
*
*/
public int getErrorSum(){
return this.taskErrorSum.incrementAndGet();
return this.taskErrorSum.get();
}
}

View File

@ -1,28 +1,14 @@
package com.muyu.web.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.web.common.pool.FixedThreadPool;
import com.muyu.web.domain.PositionRouteInfo;
import com.muyu.web.domain.model.PositionModel;
import com.muyu.web.domain.model.TaskModel;
import com.muyu.web.domain.resp.UnifiedTaskResp;
import com.muyu.web.service.PositionRouteService;
import com.muyu.web.service.VehicleInstanceService;
import com.muyu.web.service.VehicleUnifiedService;
import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.core.LocalContainer;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author DongZeLiang
* @version 1.0
@ -50,53 +36,7 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
*/
@Override
public void unifiedOnline () {
// 获取离线车辆VIN
List<String> vinList = LocalContainer.getOfflineVehicleInstance()
.stream()
.map(VehicleInstance::getVin)
.toList();
taskModel.submit("一键上线", vinList.size());
new Thread(() -> {
int vinSize = 0, executionSize = 15;
do {
int startIndex = vinSize++ * executionSize;
// 进行分页开启车辆
List<String> executionVinList = vinList.stream()
.skip(startIndex)
.limit(executionSize)
.toList();
CountDownLatch countDownLatch = new CountDownLatch(executionVinList.size());
Map<String, Thread> startVehicleThread = new ConcurrentHashMap<>();
executionVinList.forEach(vin -> {
Thread thread = new Thread(() -> {
try {
vehicleInstanceService.vehicleClientInit(vin);
startVehicleThread.remove(vin);
}catch (Exception interruptedException){
log.error(interruptedException);
}
});
startVehicleThread.put(vin, thread);
FixedThreadPool.submit(thread);
});
try {
boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
log.info(
"开始:[{}] 结束:[{}],未上线成功:[{}], vin[{}]",
startIndex, startIndex+executionVinList.size(),
startVehicleThread.size(),
JSONObject.toJSONString(executionVinList));
if (!await){
startVehicleThread.values().forEach(Thread::interrupt);
}
} catch (InterruptedException ignored) {}
if (executionVinList.size() < executionSize){
break;
}
}while (true);
taskModel.down();
}).start();
}
/**
@ -104,29 +44,6 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
*/
@Override
public void unifiedOffline () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
new Thread(() -> {
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
onlineVehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆离线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键离线", onlineVehicleInstanceList.size(), taskThread);
}
@ -135,54 +52,6 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
*/
@Override
public void unifiedSend () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆连接到服务器,请先让车辆上线");
}
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
Thread taskThread = new Thread(() -> {
try {
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if (vehicleInstance.getVehicleThread() == null) {
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆设置一键上报失败:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键上报", vehicleInstanceList.size(),taskThread);
}
@ -191,44 +60,7 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
*/
@Override
public void unifiedPosition () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆设置路线异常:[{}]", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键重置路径",vehicleInstanceList.size(), taskThread);
}
/**
@ -236,28 +68,7 @@ public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
*/
@Override
public void unifiedStop () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆一键取消上报发生错误:{}", exception.getMessage());
taskModel.incrementError();
}
});
taskModel.down();
} catch (Exception exception) {
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
});
// taskModel.submit("一键取消上报", onlineVehicleInstanceList.size(), taskThread);
}
/**