业务结构拆解

new-master
dongzeliang 2023-12-06 15:35:59 +08:00
parent 0cf9ccb431
commit 01eb776215
13 changed files with 450 additions and 329 deletions

View File

@ -39,7 +39,7 @@ public class VehicleController {
}
/**
*
* @param sum
* @param vehicleCreateAddReq
* @return
*/
@PostMapping("/create")
@ -49,4 +49,14 @@ public class VehicleController {
}
/**
*
* @param vin
* @return
*/
@DeleteMapping("/{vin}")
public Result<String> delete(@PathVariable("vin") String vin){
this.vehicleService.delete(vin);
return Result.success(null,"删除成功");
}
}

View File

@ -116,58 +116,4 @@ public class VehicleInstanceController {
this.vehicleInstanceService.editStatus(vin, statusKey, statusValue);
return Result.success();
}
/**
* 线
*/
@PostMapping("/unified/online")
public Result<String> unifiedOnline(){
this.vehicleInstanceService.unifiedOnline();
return Result.success(null,"已成功发布一键上线任务");
}
/**
* 线
*/
@PostMapping("/unified/offline")
public Result<String> unifiedOffline(){
this.vehicleInstanceService.unifiedOffline();
return Result.success(null,"已成功发布一键离线任务");
}
/**
*
*/
@PostMapping("/unified/send")
public Result<String> unifiedSend(){
this.vehicleInstanceService.unifiedSend();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/unified/position")
public Result<String> unifiedPosition(){
this.vehicleInstanceService.unifiedPosition();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/unified/stop")
public Result<String> unifiedStop(){
this.vehicleInstanceService.unifiedStop();
return Result.success(null,"已成功发布取消上报任务");
}
/**
*
* @return
*/
@GetMapping("/unified/status")
public Result<UnifiedTaskResp> unifiedStatus(){
return Result.success(this.vehicleInstanceService.unifiedStatus());
}
}

View File

@ -0,0 +1,80 @@
package com.muyu.controller;
import com.muyu.common.Result;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.service.VehicleUnifiedService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/6
*/
@Log4j2
@RestController
@RequestMapping("/vehicle/unified")
public class VehicleUnifiedController {
@Autowired
private VehicleUnifiedService vehicleUnifiedService;
/**
* 线
*/
@PostMapping("/online")
public Result<String> unifiedOnline(){
this.vehicleUnifiedService.unifiedOnline();
return Result.success(null,"已成功发布一键上线任务");
}
/**
* 线
*/
@PostMapping("/offline")
public Result<String> unifiedOffline(){
this.vehicleUnifiedService.unifiedOffline();
return Result.success(null,"已成功发布一键离线任务");
}
/**
*
*/
@PostMapping("/send")
public Result<String> unifiedSend(){
this.vehicleUnifiedService.unifiedSend();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/position")
public Result<String> unifiedPosition(){
this.vehicleUnifiedService.unifiedPosition();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/stop")
public Result<String> unifiedStop(){
this.vehicleUnifiedService.unifiedStop();
return Result.success(null,"已成功发布取消上报任务");
}
/**
*
* @return
*/
@GetMapping("/status")
public Result<UnifiedTaskResp> unifiedStatus(){
return Result.success(this.vehicleUnifiedService.unifiedStatus());
}
}

View File

@ -6,6 +6,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -27,23 +28,16 @@ public class TaskModel {
*/
private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.FALSE);
private CountDownLatch countDownLatch ;
/**
*
* @return
*/
public boolean isExecution(){
// 为true表示任务在执行
if (unifiedStatus.get()){
// 就算状态为true若执行线程为null或者线程为不活跃也可以执行也可以执行任务
return this.currentThread == null || !this.currentThread.isAlive();
}
return true;
return !unifiedStatus.get();
}
/**
* 线
*/
private Thread currentThread ;
/**
*
*/
@ -60,18 +54,16 @@ public class TaskModel {
/**
* 线
* @param task 线
* @param taskName
* @param taskExecutionSum
*/
public void submit(String taskName,Integer taskExecutionSum, Thread task){
public void submit(String taskName,Integer taskExecutionSum){
if (!this.isExecution()){
throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务");
}
unifiedStatus.set(Boolean.TRUE);
this.currentThread = task;
this.currentThread.start();
this.taskName = taskName;
this.countDownLatch = new CountDownLatch(taskExecutionSum);
this.taskExecutionSum = taskExecutionSum;
this.taskSuccessSum = new AtomicInteger();
this.taskErrorSum = new AtomicInteger();
@ -84,7 +76,6 @@ public class TaskModel {
*/
public void down(){
log.info("[{}]任务执行结束,耗时:[{}]MS", this.taskName, System.currentTimeMillis() - taskStartTime);
this.currentThread = null;
this.taskName = null;
this.taskExecutionSum = 0;
this.taskSuccessSum.set(0);

View File

@ -5,9 +5,6 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author DongZeLiang
* @version 1.0

View File

@ -71,35 +71,4 @@ public interface VehicleInstanceService {
*/
void editStatus (String vin, String statusKey, Integer statusValue);
/**
* 线
*/
public void unifiedOnline();
/**
* 线
*/
public void unifiedOffline();
/**
*
*/
public void unifiedSend();
/**
*
*/
void unifiedPosition ();
/**
*
*/
public void unifiedStop();
/**
*
* @return
*/
UnifiedTaskResp unifiedStatus();
}

View File

@ -29,4 +29,10 @@ public interface VehicleService extends IService<Vehicle> {
*
*/
void syncDb();
/**
* VIN
* @param vin vin
*/
void delete(String vin);
}

View File

@ -0,0 +1,45 @@
package com.muyu.service;
import com.muyu.domain.resp.UnifiedTaskResp;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/6
*/
public interface VehicleUnifiedService {
/**
* 线
*/
public void unifiedOnline();
/**
* 线
*/
public void unifiedOffline();
/**
*
*/
public void unifiedSend();
/**
*
*/
void unifiedPosition ();
/**
*
*/
public void unifiedStop();
/**
*
* @return
*/
UnifiedTaskResp unifiedStatus();
}

View File

@ -115,5 +115,28 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
}
}
/**
* VIN
*
* @param vin vin
*/
@Override
public void delete(String vin) {
VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin);
// 先判断车辆是否在上报数据,上报则停止
if (vehicleInstance.isSend()){
vehicleInstance.stopSend();
}
// 判断车辆是否在线,若在线则让车辆下线
if (vehicleInstance.isOnline()){
vehicleInstance.closeClient();
}
// 进行缓存删除
LocalContainer.removeByVin(vin);
// 删除数据库
this.removeById(vin);
}
}

View File

@ -213,229 +213,4 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
// private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.TRUE);
/**
*
*/
private final TaskModel taskModel = new TaskModel();
/**
* 线
*/
@Override
public void unifiedOnline () {
// 获取离线车辆
List<String> vinList
= LocalContainer.getOfflineVehicleInstance().stream().map(VehicleInstance::getVin).toList();
CountDownLatch countDownLatch = new CountDownLatch(vinList.size());
// 筛选出离线车辆并使用并行流进行上线操作
for (String vin : vinList) {
FixedThreadPool.submit(
new Thread(() -> {
try {
this.vehicleClientInit(vin);
taskModel.incrementSuccess();
} catch (Exception exception) {
exception.printStackTrace();
log.error("车辆上线异常:{}", exception.getMessage());
taskModel.incrementError();
}
countDownLatch.countDown();
})
);
}
new Thread(() -> {
boolean await = false;
do {
try {
await = countDownLatch.await(5, TimeUnit.SECONDS);
log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount());
} catch (InterruptedException e) {
log.error("等待异常:{}", e.getMessage(), e);
await = true;
}
} while (!await);
taskModel.down();
}, "一键上线").start();
}
/**
* 线
*/
@Override
public void unifiedOffline () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
onlineVehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆离线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
taskModel.submit("一键离线", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedSend () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆连接到服务器,请先让车辆上线");
}
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
Thread taskThread = new Thread(() -> {
try {
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if (vehicleInstance.getVehicleThread() == null) {
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆设置一键上报失败:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
taskModel.submit("一键上报", vehicleInstanceList.size(),taskThread);
}
/**
*
*/
@Override
public void unifiedPosition () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆设置路线异常:[{}]", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
taskModel.submit("一键重置路径",vehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedStop () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆一键取消上报发生错误:{}", exception.getMessage());
taskModel.incrementError();
}
});
taskModel.down();
} catch (Exception exception) {
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
});
taskModel.submit("一键取消上报", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*
* @return
*/
@Override
public UnifiedTaskResp unifiedStatus() {
boolean unifiedStatus = this.taskModel.getUnifiedStatus().get();
return UnifiedTaskResp.builder()
.unifiedStatus(unifiedStatus)
.taskErrorSum(this.taskModel.getErrorSum())
.taskExecutionSum(this.taskModel.getTaskExecutionSum())
.taskSuccessSum(this.taskModel.getSuccessSum())
.taskName(this.taskModel.getTaskName())
.taskStartTime(System.currentTimeMillis() - this.taskModel.getTaskStartTime())
.build();
}
}

View File

@ -0,0 +1,263 @@
package com.muyu.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.muyu.common.pool.FixedThreadPool;
import com.muyu.domain.PositionRouteInfo;
import com.muyu.domain.model.PositionModel;
import com.muyu.domain.model.TaskModel;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.service.PositionRouteService;
import com.muyu.service.VehicleInstanceService;
import com.muyu.service.VehicleUnifiedService;
import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.core.LocalContainer;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/6
*/
@Log4j2
@Service
public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
@Autowired
private VehicleInstanceService vehicleInstanceService;
@Autowired
private PositionRouteService positionRouteService;
/**
*
*/
private final TaskModel taskModel = new TaskModel();
/**
* 线
*/
@Override
public void unifiedOnline () {
// 获取离线车辆
List<String> vinList
= LocalContainer.getOfflineVehicleInstance().stream().map(VehicleInstance::getVin).toList();
CountDownLatch countDownLatch = new CountDownLatch(vinList.size());
// 筛选出离线车辆并使用并行流进行上线操作
for (String vin : vinList) {
FixedThreadPool.submit(
new Thread(() -> {
try {
vehicleInstanceService.vehicleClientInit(vin);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆上线异常:{}", exception.getMessage(), exception);
taskModel.incrementError();
}
countDownLatch.countDown();
})
);
}
new Thread(() -> {
boolean await = false;
do {
try {
await = countDownLatch.await(5, TimeUnit.SECONDS);
log.info("等待一轮训,还剩余:[{}]", countDownLatch.getCount());
} catch (InterruptedException e) {
log.error("等待异常:{}", e.getMessage(), e);
await = true;
}
} while (!await);
taskModel.down();
}, "一键上线").start();
}
/**
* 线
*/
@Override
public void unifiedOffline () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
onlineVehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆离线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键离线", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedSend () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆连接到服务器,请先让车辆上线");
}
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
Thread taskThread = new Thread(() -> {
try {
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if (vehicleInstance.getVehicleThread() == null) {
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆设置一键上报失败:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键上报", vehicleInstanceList.size(),taskThread);
}
/**
*
*/
@Override
public void unifiedPosition () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆设置路线异常:[{}]", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键重置路径",vehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedStop () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆一键取消上报发生错误:{}", exception.getMessage());
taskModel.incrementError();
}
});
taskModel.down();
} catch (Exception exception) {
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
});
// taskModel.submit("一键取消上报", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*
* @return
*/
@Override
public UnifiedTaskResp unifiedStatus() {
boolean unifiedStatus = this.taskModel.getUnifiedStatus().get();
return UnifiedTaskResp.builder()
.unifiedStatus(unifiedStatus)
.taskErrorSum(this.taskModel.getErrorSum())
.taskExecutionSum(this.taskModel.getTaskExecutionSum())
.taskSuccessSum(this.taskModel.getSuccessSum())
.taskName(this.taskModel.getTaskName())
.taskStartTime(System.currentTimeMillis() - this.taskModel.getTaskStartTime())
.build();
}
}

View File

@ -158,6 +158,14 @@ public class VehicleInstance {
return this.client.isConnected();
}
/**
* 线
* @return truefalse
*/
public boolean isSend(){
return this.vehicleThread != null;
}
/**
*
*/

View File

@ -70,4 +70,12 @@ public class LocalContainer {
public static List<VehicleInstance> getOfflineVehicleInstance(){
return vehicleDataMap.values().stream().filter(vehicleInstance -> !vehicleInstance.isOnline()).toList();
}
/**
* VIN
* @param vin VIN
*/
public static void removeByVin(String vin) {
vehicleDataMap.remove(vin);
}
}