增加任务执行逻辑

master
dongzeliang 2023-12-05 09:43:31 +08:00
parent 36834b9bd5
commit a633e1c0a9
5 changed files with 351 additions and 95 deletions

View File

@ -6,6 +6,7 @@ import com.muyu.domain.req.CheckPositionReq;
import com.muyu.domain.req.GearReq;
import com.muyu.domain.req.MsgReq;
import com.muyu.domain.req.VehicleInstanceListReq;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.domain.resp.VehicleInstanceResp;
import com.muyu.service.VehicleInstanceService;
import com.muyu.vehicle.core.LocalContainer;
@ -14,8 +15,6 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author DongZl
* @description:
@ -162,4 +161,13 @@ public class VehicleInstanceController {
this.vehicleInstanceService.unifiedStop();
return Result.success(null,"已成功发布取消上报任务");
}
/**
*
* @return
*/
@GetMapping("/unified/status")
public Result<UnifiedTaskResp> unifiedStatus(){
return Result.success(this.vehicleInstanceService.unifiedStatus());
}
}

View File

@ -0,0 +1,130 @@
package com.muyu.domain.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/4
*/
@Data
@Log4j2
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskModel {
/**
* false
*/
private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.FALSE);
/**
*
* @return
*/
public synchronized boolean isExecution(){
// 为true表示任务在执行
if (unifiedStatus.get()){
// 就算状态为true若执行线程为null或者线程为不活跃也可以执行也可以执行任务
return this.currentThread == null || !this.currentThread.isAlive();
}
return true;
}
/**
* 线
*/
private Thread currentThread ;
/**
*
*/
private String taskName;
/**
*
*/
private Integer taskExecutionSum;
/**
*
*/
private long taskStartTime;
/**
* 线
* @param task 线
* @param taskName
* @param taskExecutionSum
*/
public synchronized void submit(String taskName,Integer taskExecutionSum, Thread task){
if (!this.isExecution()){
throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务");
}
unifiedStatus.set(Boolean.TRUE);
this.currentThread = task;
this.currentThread.start();
this.taskName = taskName;
this.taskExecutionSum = taskExecutionSum;
this.taskSuccessSum = new AtomicInteger();
this.taskErrorSum = new AtomicInteger();
this.taskStartTime = System.currentTimeMillis();
log.info("[{}]任务执行开始", this.taskName);
}
/**
*
*/
public void down(){
log.info("[{}]任务执行结束,耗时:[{}]MS", this.taskName, System.currentTimeMillis() - taskStartTime);
this.currentThread = null;
this.taskName = null;
this.taskExecutionSum = 0;
this.taskSuccessSum.set(0);
this.taskErrorSum.set(0);
unifiedStatus.set(Boolean.FALSE);
}
/**
*
*/
private AtomicInteger taskSuccessSum;
/**
*
*/
public void incrementSuccess(){
this.taskSuccessSum.incrementAndGet();
}
/**
*
*/
public int getSuccessSum(){
return this.taskSuccessSum.incrementAndGet();
}
/**
*
*/
private AtomicInteger taskErrorSum;
/**
*
*/
public void incrementError(){
this.taskErrorSum.incrementAndGet();
}
/**
*
*/
public int getErrorSum(){
return this.taskErrorSum.incrementAndGet();
}
}

View File

@ -0,0 +1,53 @@
package com.muyu.domain.resp;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/5
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UnifiedTaskResp {
/**
* false
*/
private boolean unifiedStatus;
/**
*
*/
private String taskName;
/**
*
*/
private Integer taskExecutionSum;
/**
*
*/
private long taskStartTime;
/**
*
*/
private Integer taskSuccessSum;
/**
*
*/
private Integer taskErrorSum;
}

View File

@ -6,6 +6,7 @@ import com.muyu.domain.req.CheckPositionReq;
import com.muyu.domain.req.GearReq;
import com.muyu.domain.req.MsgReq;
import com.muyu.domain.req.VehicleInstanceListReq;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.domain.resp.VehicleInstanceResp;
/**
@ -96,4 +97,9 @@ public interface VehicleInstanceService {
*/
public void unifiedStop();
/**
*
* @return
*/
UnifiedTaskResp unifiedStatus();
}

View File

@ -6,10 +6,12 @@ import com.muyu.common.Result;
import com.muyu.domain.PositionRouteInfo;
import com.muyu.domain.Vehicle;
import com.muyu.domain.model.PositionModel;
import com.muyu.domain.model.TaskModel;
import com.muyu.domain.req.CheckPositionReq;
import com.muyu.domain.req.GearReq;
import com.muyu.domain.req.MsgReq;
import com.muyu.domain.req.VehicleInstanceListReq;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.domain.resp.VehicleInstanceResp;
import com.muyu.service.PositionRouteService;
import com.muyu.service.VehicleInstanceService;
@ -120,6 +122,7 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
Result<String> result = clientAdmin.vehicleConnection(connectionReq);
if (result.getCode() != 200){
log.error("车辆:[{}],申请上线异常:[{}]", vin, result.getMsg());
throw new RuntimeException("远程服务器没有【"+vin+"】车辆");
}
MqttProperties mqttProperties = MqttProperties.builder()
.broker(broker)
@ -203,34 +206,43 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
ReflectUtils.invokeSetter(vehicleData, statusKey, statusValue);
}
private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.TRUE);
// private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.TRUE);
/**
*
*/
private final TaskModel taskModel = new TaskModel();
/**
* 线
*/
@Override
public void unifiedOnline () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
// 获取离线车辆
List<VehicleInstance> offlineVehicleInstanceList
= LocalContainer.getOfflineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
unifiedStatus.set(Boolean.FALSE);
try {
// 筛选出离线车辆并使用并行流进行上线操作
LocalContainer.getOfflineVehicleInstance()
.stream()
.parallel()
.map(VehicleInstance::getVin)
.forEach(this::vehicleClientInit);
}catch (Exception exception){
log.error("车辆一件上线报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}catch (Exception exception){
// 筛选出离线车辆并使用并行流进行上线操作
offlineVehicleInstanceList
.stream()
.parallel()
.map(VehicleInstance::getVin)
.forEach(vin -> {
try {
this.vehicleClientInit(vin);
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆上线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一件上线报错:{}", exception.getMessage(), exception);
}
}).start();
taskModel.down();
});
taskModel.submit("一键上线", offlineVehicleInstanceList.size(), taskThread);
}
/**
@ -238,25 +250,30 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
*/
@Override
public void unifiedOffline () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
unifiedStatus.set(Boolean.FALSE);
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
LocalContainer.getOnlineVehicleInstance()
onlineVehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
try {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆离线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
}catch (Exception exception){
} catch (Exception exception) {
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}).start();
taskModel.down();
});
taskModel.submit("一键离线", onlineVehicleInstanceList.size(), taskThread);
}
/**
@ -264,36 +281,54 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
*/
@Override
public void unifiedSend () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆上线,请先让车辆上线");
throw new RuntimeException("还没有车辆连接到服务器,请先让车辆上线");
}
new Thread(() -> {
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
Thread taskThread = new Thread(() -> {
try {
unifiedStatus.set(Boolean.FALSE);
// 先一键重置路径
this.unifiedPositionPri ();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if(vehicleInstance.getVehicleThread() == null){
vehicleInstance.initVehicleThread();
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if (vehicleInstance.getVehicleThread() == null) {
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆设置一键上报失败:{}", exception.getMessage());
taskModel.incrementError();
}
vehicleInstance.startSend();
});
}catch (Exception exception){
} catch (Exception exception) {
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}).start();
taskModel.down();
});
taskModel.submit("一键上报", vehicleInstanceList.size(),taskThread);
}
@ -302,44 +337,44 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
*/
@Override
public void unifiedPosition () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
unifiedStatus.set(Boolean.FALSE);
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
this.unifiedPositionPri();
}catch (Exception exception){
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆设置路线异常:[{}]", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}).start();
}
private void unifiedPositionPri(){
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
});
taskModel.down();
});
taskModel.submit("一键重置路径",vehicleInstanceList.size(), taskThread);
}
/**
@ -347,22 +382,46 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
*/
@Override
public void unifiedStop () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
unifiedStatus.set(Boolean.FALSE);
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(VehicleInstance::stopSend);
unifiedStatus.set(Boolean.TRUE);
}catch (Exception exception){
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆一键取消上报发生错误:{}", exception.getMessage());
taskModel.incrementError();
}
});
taskModel.down();
} catch (Exception exception) {
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
}).start();
});
taskModel.submit("一键取消上报", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*
* @return
*/
@Override
public UnifiedTaskResp unifiedStatus() {
boolean unifiedStatus = this.taskModel.getUnifiedStatus().get();
return UnifiedTaskResp.builder()
.unifiedStatus(unifiedStatus)
.taskErrorSum(this.taskModel.getErrorSum())
.taskExecutionSum(this.taskModel.getTaskExecutionSum())
.taskSuccessSum(this.taskModel.getSuccessSum())
.taskName(this.taskModel.getTaskName())
.taskStartTime(System.currentTimeMillis() - this.taskModel.getTaskStartTime())
.build();
}
}