Compare commits

...

10 Commits

Author SHA1 Message Date
Saisai Liu 525a4eeb5a feat():模拟车辆基础模板 2024-05-30 19:02:37 +08:00
Saisai Liu bd3d964663 feat():模拟车辆基础模板 2024-05-29 10:21:43 +08:00
Saisai Liu 743e81025f 基础模板 2024-05-24 21:09:25 +08:00
Saisai Liu ffa909eece 基础模板 2024-05-24 21:08:19 +08:00
DongZeLiang 3ecc09bb50 更新车辆上线代码逻辑 2024-03-26 10:00:35 +08:00
DongZeLiang 5c2fbb91dd 一键上线 2023-12-26 16:37:17 +08:00
dongzeliang eb1937a27b 代码未完善 2023-12-06 18:37:19 +08:00
dongzeliang 01eb776215 业务结构拆解 2023-12-06 15:35:59 +08:00
dongzeliang 0cf9ccb431 增加任务执行逻辑 2023-12-05 15:25:55 +08:00
DongZeLiang ed61bb6973 测试代码 2023-12-05 15:10:42 +08:00
35 changed files with 729 additions and 398 deletions

View File

@ -7,7 +7,7 @@
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="" vcs="Git" />
</component>
</project>

10
pom.xml
View File

@ -13,7 +13,6 @@
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mybatisplus.version>3.5.1</mybatisplus.version>
<freemaker.version>2.3.31</freemaker.version>
</properties>
<parent>
@ -93,6 +92,15 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>ecs20140526</artifactId>
<version>5.1.8</version>
</dependency>
</dependencies>
</project>

View File

@ -5,7 +5,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/9

View File

@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
import java.util.List;
/**
* @Author: DongZeLiang
* @Author: Saisai.Liu
* @date: 2023/12/2
* @Description:
* @Version: 1.0

View File

@ -4,7 +4,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/15

View File

@ -0,0 +1,34 @@
package com.muyu.common.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author DongZl
* @description: 线
* @Date 2023-12-5 01:51
*/
public class FixedThreadPool {
/**
* 线
*/
private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(15);
/**
* 线
* @param thread 线
*/
public static Future<?> submit(Thread thread){
return fixedThreadPool.submit(thread);
}
/**
* 线
*/
public static void shutDown(){
fixedThreadPool.shutdown();
}
}

View File

@ -1,4 +1,4 @@
package com.muyu.common;
package com.muyu.common.pool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -10,12 +10,13 @@ import java.util.concurrent.TimeUnit;
* @description: 线
* @Date 2023-11-17 09:16
*/
public class ThreadPool {
public class ScheduledThreadPool {
/**
* 线 CPU * 2 + 1
*/
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 + 1);
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors() * 2 + 1);
public static ScheduledFuture<?> submit (Runnable thread){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位

View File

@ -5,7 +5,7 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/15

View File

@ -5,7 +5,7 @@ import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description index
* @date 2023/12/1
@ -13,6 +13,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
@Log4j2
@Controller
public class IndexController {
@RequestMapping("/")
public String hello(){
log.debug("重定向到 - /static/index 页面");

View File

@ -10,7 +10,7 @@ import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/9
@ -39,7 +39,7 @@ public class VehicleController {
}
/**
*
* @param sum
* @param vehicleCreateAddReq
* @return
*/
@PostMapping("/create")
@ -49,4 +49,14 @@ public class VehicleController {
}
/**
*
* @param vin
* @return
*/
@DeleteMapping("/{vin}")
public Result<String> delete(@PathVariable("vin") String vin){
this.vehicleService.delete(vin);
return Result.success(null,"删除成功");
}
}

View File

@ -116,58 +116,4 @@ public class VehicleInstanceController {
this.vehicleInstanceService.editStatus(vin, statusKey, statusValue);
return Result.success();
}
/**
* 线
*/
@PostMapping("/unified/online")
public Result<String> unifiedOnline(){
this.vehicleInstanceService.unifiedOnline();
return Result.success(null,"已成功发布一键上线任务");
}
/**
* 线
*/
@PostMapping("/unified/offline")
public Result<String> unifiedOffline(){
this.vehicleInstanceService.unifiedOffline();
return Result.success(null,"已成功发布一键离线任务");
}
/**
*
*/
@PostMapping("/unified/send")
public Result<String> unifiedSend(){
this.vehicleInstanceService.unifiedSend();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/unified/position")
public Result<String> unifiedPosition(){
this.vehicleInstanceService.unifiedPosition();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/unified/stop")
public Result<String> unifiedStop(){
this.vehicleInstanceService.unifiedStop();
return Result.success(null,"已成功发布取消上报任务");
}
/**
*
* @return
*/
@GetMapping("/unified/status")
public Result<UnifiedTaskResp> unifiedStatus(){
return Result.success(this.vehicleInstanceService.unifiedStatus());
}
}

View File

@ -0,0 +1,80 @@
package com.muyu.controller;
import com.muyu.common.Result;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.service.VehicleUnifiedService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/12/6
*/
@Log4j2
@RestController
@RequestMapping("/vehicle/unified")
public class VehicleUnifiedController {
@Autowired
private VehicleUnifiedService vehicleUnifiedService;
/**
* 线
*/
@PostMapping("/online")
public Result<String> unifiedOnline(){
this.vehicleUnifiedService.unifiedOnline();
return Result.success(null,"已成功发布一键上线任务");
}
/**
* 线
*/
@PostMapping("/offline")
public Result<String> unifiedOffline(){
this.vehicleUnifiedService.unifiedOffline();
return Result.success(null,"已成功发布一键离线任务");
}
/**
*
*/
@PostMapping("/send")
public Result<String> unifiedSend(){
this.vehicleUnifiedService.unifiedSend();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/position")
public Result<String> unifiedPosition(){
this.vehicleUnifiedService.unifiedPosition();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/stop")
public Result<String> unifiedStop(){
this.vehicleUnifiedService.unifiedStop();
return Result.success(null,"已成功发布取消上报任务");
}
/**
*
* @return
*/
@GetMapping("/status")
public Result<UnifiedTaskResp> unifiedStatus(){
return Result.success(this.vehicleUnifiedService.unifiedStatus());
}
}

View File

@ -1,10 +1,16 @@
package com.muyu.controller;
import com.muyu.common.Result;
import com.muyu.domain.model.MqttServerModel;
import com.muyu.vehicle.api.ClientAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @author DongZl
* @description: 线
@ -14,12 +20,30 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/verify")
public class VerifyController {
@Autowired
private ClientAdmin admin;
@Value("${mqtt.server.host}")
private String broker;
@Value("${mqtt.server.topic}")
private String topic;
/**
* 线
* @return test
*/
@PostMapping("/vehicleConnection")
public Result<String> vehicleConnection(){
return Result.success("test");
public Result<MqttServerModel> vehicleConnection(){
// this.broker = "tcp://"+admin.getIp();
return Result.success(
MqttServerModel.builder()
.broker(broker)
.topic(UUID.randomUUID().toString())
.build()
);
}
}

View File

@ -15,7 +15,7 @@ import java.math.BigDecimal;
import java.util.Date;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/9

View File

@ -0,0 +1,28 @@
package com.muyu.domain.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author DongZl
* @description: Mqtt
* @Date 2024-3-26 09:53
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttServerModel {
/**
* MQTT
*/
private String broker;
/**
* MQTT
*/
private String topic;
}

View File

@ -6,11 +6,12 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/12/4
@ -27,23 +28,16 @@ public class TaskModel {
*/
private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.FALSE);
private CountDownLatch countDownLatch ;
/**
*
* @return
*/
public synchronized boolean isExecution(){
// 为true表示任务在执行
if (unifiedStatus.get()){
// 就算状态为true若执行线程为null或者线程为不活跃也可以执行也可以执行任务
return this.currentThread == null || !this.currentThread.isAlive();
}
return true;
public boolean isExecution(){
return !unifiedStatus.get();
}
/**
* 线
*/
private Thread currentThread ;
/**
*
*/
@ -51,7 +45,7 @@ public class TaskModel {
/**
*
*/
private Integer taskExecutionSum;
private Integer taskExecutionSum = 0;
/**
*
@ -60,18 +54,16 @@ public class TaskModel {
/**
* 线
* @param task 线
* @param taskName
* @param taskExecutionSum
*/
public synchronized void submit(String taskName,Integer taskExecutionSum, Thread task){
public void submit(String taskName,Integer taskExecutionSum){
if (!this.isExecution()){
throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务");
}
unifiedStatus.set(Boolean.TRUE);
this.currentThread = task;
this.currentThread.start();
this.taskName = taskName;
this.countDownLatch = new CountDownLatch(taskExecutionSum);
this.taskExecutionSum = taskExecutionSum;
this.taskSuccessSum = new AtomicInteger();
this.taskErrorSum = new AtomicInteger();
@ -84,7 +76,6 @@ public class TaskModel {
*/
public void down(){
log.info("[{}]任务执行结束,耗时:[{}]MS", this.taskName, System.currentTimeMillis() - taskStartTime);
this.currentThread = null;
this.taskName = null;
this.taskExecutionSum = 0;
this.taskSuccessSum.set(0);
@ -95,7 +86,7 @@ public class TaskModel {
/**
*
*/
private AtomicInteger taskSuccessSum;
private AtomicInteger taskSuccessSum = new AtomicInteger();
/**
*
@ -113,7 +104,7 @@ public class TaskModel {
/**
*
*/
private AtomicInteger taskErrorSum;
private AtomicInteger taskErrorSum = new AtomicInteger();
/**
*

View File

@ -5,11 +5,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/12/5

View File

@ -8,7 +8,7 @@ import com.muyu.domain.Vehicle;
* Mapper
* </p>
*
* @author DongZeLiang
* @author Saisai.Liu
* @since 2022-07-05
*/
public interface VehicleMapper extends BaseMapper<Vehicle> {

View File

@ -0,0 +1,15 @@
package com.muyu.mq.rabbitmq;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @ClassName Custom
* @Description
* @Author SaiSai.Liu
* @Date 2024/5/26 15:25
*/
public class Custom {
// @Autowired
// private RabbitTemplate rabbitTemplate;
}

View File

@ -10,7 +10,7 @@ import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.domain.resp.VehicleInstanceResp;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/22
@ -71,35 +71,4 @@ public interface VehicleInstanceService {
*/
void editStatus (String vin, String statusKey, Integer statusValue);
/**
* 线
*/
public void unifiedOnline();
/**
* 线
*/
public void unifiedOffline();
/**
*
*/
public void unifiedSend();
/**
*
*/
void unifiedPosition ();
/**
*
*/
public void unifiedStop();
/**
*
* @return
*/
UnifiedTaskResp unifiedStatus();
}

View File

@ -8,7 +8,7 @@ import com.muyu.domain.Vehicle;
*
* </p>
*
* @author DongZeLiang
* @author Saisai.Liu
* @since 2022-07-05
*/
public interface VehicleService extends IService<Vehicle> {
@ -29,4 +29,10 @@ public interface VehicleService extends IService<Vehicle> {
*
*/
void syncDb();
/**
* VIN
* @param vin vin
*/
void delete(String vin);
}

View File

@ -0,0 +1,45 @@
package com.muyu.service;
import com.muyu.domain.resp.UnifiedTaskResp;
/**
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/12/6
*/
public interface VehicleUnifiedService {
/**
* 线
*/
public void unifiedOnline();
/**
* 线
*/
public void unifiedOffline();
/**
*
*/
public void unifiedSend();
/**
*
*/
void unifiedPosition ();
/**
*
*/
public void unifiedStop();
/**
*
* @return
*/
UnifiedTaskResp unifiedStatus();
}

View File

@ -25,7 +25,7 @@ import java.util.stream.Stream;
*
* </p>
*
* @author DongZeLiang
* @author Saisai.Liu
* @since 2022-07-05
*/
@Log4j2
@ -85,6 +85,7 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
@Override
public void syncDb () {
try {
// vehicleInstanceService.isTaskStatus();
log.info("同步数据库开始");
long startTime = System.currentTimeMillis();
Collection<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
@ -114,5 +115,28 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
}
}
/**
* VIN
*
* @param vin vin
*/
@Override
public void delete(String vin) {
VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin);
// 先判断车辆是否在上报数据,上报则停止
if (vehicleInstance.isSend()){
vehicleInstance.stopSend();
}
// 判断车辆是否在线,若在线则让车辆下线
if (vehicleInstance.isOnline()){
vehicleInstance.closeClient();
}
// 进行缓存删除
LocalContainer.removeByVin(vin);
// 删除数据库
this.removeById(vin);
}
}

View File

@ -1,17 +1,14 @@
package com.muyu.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.muyu.common.PageList;
import com.muyu.common.Result;
import com.muyu.domain.PositionRouteInfo;
import com.muyu.domain.Vehicle;
import com.muyu.domain.model.MqttServerModel;
import com.muyu.domain.model.PositionModel;
import com.muyu.domain.model.TaskModel;
import com.muyu.domain.req.CheckPositionReq;
import com.muyu.domain.req.GearReq;
import com.muyu.domain.req.MsgReq;
import com.muyu.domain.req.VehicleInstanceListReq;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.domain.resp.VehicleInstanceResp;
import com.muyu.service.PositionRouteService;
import com.muyu.service.VehicleInstanceService;
@ -26,18 +23,15 @@ import com.muyu.vehicle.model.properties.MqttProperties;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/22
@ -52,8 +46,6 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
@Autowired
private ClientAdmin clientAdmin;
@Value("${mqtt.server.host}")
private String broker;
/**
*
@ -108,6 +100,7 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
*/
@Override
public void vehicleClientInit (String vin) {
log.info("vin[{}],开始上线", vin);
VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin);
if (vehicleInstance == null){
throw new RuntimeException("没有【"+vin+"】车辆");
@ -119,25 +112,27 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
.userName(MD5Util.encrypted(vin+timestamp))
.nonce(MD5Util.encrypted(UUID.randomUUID().toString().replace("-", "")))
.build();
Result<String> result = clientAdmin.vehicleConnection(connectionReq);
Result<MqttServerModel> result = clientAdmin.vehicleConnection(connectionReq);
if (result.getCode() != 200){
log.error("车辆:[{}],申请上线异常:[{}]", vin, result.getMsg());
throw new RuntimeException("远程服务器没有【"+vin+"】车辆");
}
MqttServerModel mqttServerModel = result.getData();
MqttProperties mqttProperties = MqttProperties.builder()
.broker(broker)
.topic(result.getData())
.broker(mqttServerModel.getBroker())
.topic(mqttServerModel.getTopic())
.clientId(vin)
.username(connectionReq.getUserName())
.password(vin+connectionReq.getTimestamp()+connectionReq.getNonce())
.password(vin + connectionReq.getTimestamp() + connectionReq.getNonce())
.build();
vehicleInstance.setMqttProperties(mqttProperties);
vehicleInstance.initClient();
log.info("vin[{}],上线成功", vin);
}
/**
*
*
* @param vin vin
*/
@Override
@ -208,220 +203,4 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
// private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.TRUE);
/**
*
*/
private final TaskModel taskModel = new TaskModel();
/**
* 线
*/
@Override
public void unifiedOnline () {
// 获取离线车辆
List<VehicleInstance> offlineVehicleInstanceList
= LocalContainer.getOfflineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 筛选出离线车辆并使用并行流进行上线操作
offlineVehicleInstanceList
.stream()
.parallel()
.map(VehicleInstance::getVin)
.forEach(vin -> {
try {
this.vehicleClientInit(vin);
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆上线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一件上线报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
taskModel.submit("一键上线", offlineVehicleInstanceList.size(), taskThread);
}
/**
* 线
*/
@Override
public void unifiedOffline () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
onlineVehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆离线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
taskModel.submit("一键离线", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedSend () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆连接到服务器,请先让车辆上线");
}
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
Thread taskThread = new Thread(() -> {
try {
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if (vehicleInstance.getVehicleThread() == null) {
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆设置一键上报失败:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
taskModel.submit("一键上报", vehicleInstanceList.size(),taskThread);
}
/**
*
*/
@Override
public void unifiedPosition () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆设置路线异常:[{}]", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
taskModel.submit("一键重置路径",vehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedStop () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆一键取消上报发生错误:{}", exception.getMessage());
taskModel.incrementError();
}
});
taskModel.down();
} catch (Exception exception) {
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
});
taskModel.submit("一键取消上报", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*
* @return
*/
@Override
public UnifiedTaskResp unifiedStatus() {
boolean unifiedStatus = this.taskModel.getUnifiedStatus().get();
return UnifiedTaskResp.builder()
.unifiedStatus(unifiedStatus)
.taskErrorSum(this.taskModel.getErrorSum())
.taskExecutionSum(this.taskModel.getTaskExecutionSum())
.taskSuccessSum(this.taskModel.getSuccessSum())
.taskName(this.taskModel.getTaskName())
.taskStartTime(System.currentTimeMillis() - this.taskModel.getTaskStartTime())
.build();
}
}

View File

@ -0,0 +1,280 @@
package com.muyu.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.pool.FixedThreadPool;
import com.muyu.domain.PositionRouteInfo;
import com.muyu.domain.model.PositionModel;
import com.muyu.domain.model.TaskModel;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.service.PositionRouteService;
import com.muyu.service.VehicleInstanceService;
import com.muyu.service.VehicleUnifiedService;
import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.core.LocalContainer;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/12/6
*/
@Log4j2
@Service
public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
@Autowired
private VehicleInstanceService vehicleInstanceService;
@Autowired
private PositionRouteService positionRouteService;
/**
*
*/
private final TaskModel taskModel = new TaskModel();
/**
* 线
*/
@Override
public void unifiedOnline () {
// 获取离线车辆VIN
List<String> vinList = LocalContainer.getOfflineVehicleInstance()
.stream()
.map(VehicleInstance::getVin)
.toList();
taskModel.submit("一键上线", vinList.size());
new Thread(() -> {
int vinSize = 0, executionSize = 15;
do {
int startIndex = vinSize++ * executionSize;
// 进行分页开启车辆
List<String> executionVinList = vinList.stream()
.skip(startIndex)
.limit(executionSize)
.toList();
CountDownLatch countDownLatch = new CountDownLatch(executionVinList.size());
Map<String, Thread> startVehicleThread = new ConcurrentHashMap<>();
executionVinList.forEach(vin -> {
Thread thread = new Thread(() -> {
try {
vehicleInstanceService.vehicleClientInit(vin);
startVehicleThread.remove(vin);
}catch (Exception interruptedException){
log.error(interruptedException);
}
});
startVehicleThread.put(vin, thread);
FixedThreadPool.submit(thread);
});
try {
boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
log.info(
"开始:[{}] 结束:[{}],未上线成功:[{}], vin[{}]",
startIndex, startIndex+executionVinList.size(),
startVehicleThread.size(),
JSONObject.toJSONString(executionVinList));
if (!await){
startVehicleThread.values().forEach(Thread::interrupt);
}
} catch (InterruptedException ignored) {}
if (executionVinList.size() < executionSize){
break;
}
}while (true);
taskModel.down();
}).start();
}
/**
* 线
*/
@Override
public void unifiedOffline () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
new Thread(() -> {
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
onlineVehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆离线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键离线", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedSend () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆连接到服务器,请先让车辆上线");
}
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
Thread taskThread = new Thread(() -> {
try {
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if (vehicleInstance.getVehicleThread() == null) {
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆设置一键上报失败:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键上报", vehicleInstanceList.size(),taskThread);
}
/**
*
*/
@Override
public void unifiedPosition () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆设置路线异常:[{}]", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键重置路径",vehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedStop () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆一键取消上报发生错误:{}", exception.getMessage());
taskModel.incrementError();
}
});
taskModel.down();
} catch (Exception exception) {
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
});
// taskModel.submit("一键取消上报", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*
* @return
*/
@Override
public UnifiedTaskResp unifiedStatus() {
boolean unifiedStatus = this.taskModel.getUnifiedStatus().get();
return UnifiedTaskResp.builder()
.unifiedStatus(unifiedStatus)
.taskErrorSum(this.taskModel.getErrorSum())
.taskExecutionSum(this.taskModel.getTaskExecutionSum())
.taskSuccessSum(this.taskModel.getSuccessSum())
.taskName(this.taskModel.getTaskName())
.taskStartTime(System.currentTimeMillis() - this.taskModel.getTaskStartTime())
.build();
}
}

View File

@ -1,5 +1,7 @@
package com.muyu.utils;
import java.nio.charset.StandardCharsets;
public class ConversionUtil {
/**
@ -17,4 +19,47 @@ public class ConversionUtil {
}
return sb.toString();
}
public static void main (String[] args) {
// String str = "<?xml version=\"1.0\"?>\n" +
// "<monitorRoot type=\"param\"><synchronizeSyptom event=\"0\" initial=\"true\"><Action_ECG><Rhythm>Sinus</Rhythm><HR>80</HR><EMD>No Change</EMD><Conduct>0</Conduct></Action_ECG><Action_Osat value=\"94\" isRelativePercent=\"false\"/><Action_BP isRelativePercent=\"false\"><Shrink value=\"120\"/><Stretch value=\"80\"/></Action_BP><Action_Resp breathType=\"Normal\" value=\"14\" isRelativePercent=\"false\"/><Action_etCO2 value=\"34\" isRelativePercent=\"false\"/><Action_Temperature value=\"35.2\"/><Action_CVP value=\"6.0\"/><Action_PAPDia value=\"10\"/><Action_PAPSys value=\"25\"/><Action_WP value=\"9\"/></synchronizeSyptom></monitorRoot>";
// String strToSixteen = strToSixteen(str);
// System.out.println(str);
// System.out.println(str.length());
// System.out.println(strToSixteen);
// System.out.println(strToSixteen.replace(" ", "").length());
String hexStr = "3C3F786D6C2076657273696F6E3D22312E30223F3E0D0A3C6D6F6E69746F72526F6F7420747970653D22706172616D223E3C73796E6368726F6E697A65537970746F6D206576656E743D22302220696E697469616C3D2274727565223E3C416374696F6E5F4543473E3C52687974686D3E53696E75733C2F52687974686D3E3C48523E38303C2F48523E3C454D443E4E6F204368616E67653C2F454D443E3C436F6E647563743E303C2F436F6E647563743E3C2F416374696F6E5F4543473E3C416374696F6E5F4F7361742076616C75653D2239342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F425020697352656C617469766550657263656E743D2266616C7365223E3C536872696E6B2076616C75653D22313230222F3E3C537472657463682076616C75653D223830222F3E3C2F416374696F6E5F42503E3C416374696F6E5F5265737020627265617468547970653D224E6F726D616C222076616C75653D2231342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F6574434F322076616C75653D2233342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F54656D70657261747572652076616C75653D2233352E32222F3E3C416374696F6E5F4356502076616C75653D22362E30222F3E3C416374696F6E5F5041504469612076616C75653D223130222F3E3C416374696F6E5F5041505379732076616C75653D223235222F3E3C416374696F6E5F57502076616C75653D2239222F3E3C2F73796E6368726F6E697A65537970746F6D3E3C2F6D6F6E69746F72526F6F743E0D0A";
String hexStringToString = hexStringToString(hexStr);
System.out.println(hexStr);
System.out.println(hexStr.length());
System.out.println(hexStringToString);
System.out.println(hexStringToString.length());
}
/**
* 16string
* @param s
* @return
*/
public static String hexStringToString(String s) {
if (s == null || s.equals("")) {
return null;
}
s = s.replace(" ", "");
byte[] baKeyword = new byte[s.length() / 2];
for (int i = 0; i < baKeyword.length; i++) {
try {
baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
} catch (Exception e) {
e.printStackTrace();
}
}
try {
s = new String(baKeyword, StandardCharsets.UTF_8);
} catch (Exception e1) {
e1.printStackTrace();
}
return s;
}
}

View File

@ -0,0 +1,30 @@
1、收集系统 - MQTT接受数据
1.1上报kafka
2、主流程
2.1、消费者按照分区主动拉取车辆报文
2.2、解析车辆报文,把报文信息转换为实体对象
2.3、解析系统 - 业务拆分(拆解主、从流程)
2.4、IOTDB搭建
2.5、封装IOTDB插件
2.6、解析系统存储到IOTDB当中
2.7、封装三级缓存插件、并保证数据一致性
3、解析系统执行从流程规范与规则
3.1、执行车辆实时数据 - 业务预留
3.2、执行车辆故障报警 - 业务预留
3.3、执行车辆电子围栏 - 业务预留
4、车辆管理:
4.1、车辆上线、下线
4.2、车辆标签
4.3、电子围栏
4.4、电子围栏 - 新增
4.5、电子围栏 - 回显/修改
4.6、电子围栏 - 列表/查看
4.7、电子围栏 - 删除
4.8、车辆电子围栏预警业务
4.9、车辆故障维护
4.10、车辆故障解析
4.11、搭建MQ消息系统
4.12、车辆故障记录
https://gitea.qinmian.online/Mobai/NetworkingCar.git

View File

@ -2,7 +2,7 @@ package com.muyu.vehicle;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.SystemConstant;
import com.muyu.common.ThreadPool;
import com.muyu.common.pool.ScheduledThreadPool;
import com.muyu.domain.Vehicle;
import com.muyu.domain.model.PositionModel;
import com.muyu.utils.CalculateCheckDigit;
@ -32,10 +32,11 @@ import java.util.concurrent.ScheduledFuture;
import static com.muyu.common.SystemConstant.*;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/16
*
*/
@Data
@Log4j2
@ -89,9 +90,6 @@ public class VehicleInstance {
*/
private MqttProperties mqttProperties;
public VehicleInstance(MqttProperties mqttProperties) {
this.mqttProperties = mqttProperties;
}
/***
* VIN
@ -136,8 +134,8 @@ public class VehicleInstance {
options.setPassword(mqttProperties.getPassword().toCharArray());
}
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
options.setConnectionTimeout(1);
options.setKeepAliveInterval(20);
// 连接
client.connect(options);
log.debug("车辆:[{}] 客户端初始化成功连接配置:{}", getVin(),
@ -158,6 +156,14 @@ public class VehicleInstance {
return this.client.isConnected();
}
/**
* 线
* @return truefalse
*/
public boolean isSend(){
return this.vehicleThread != null;
}
/**
*
*/
@ -195,7 +201,7 @@ public class VehicleInstance {
VehicleThread vehicleThread = new VehicleThread();
vehicleThread.setVehicleInstance(this);
this.setVehicleThread(vehicleThread);
ScheduledFuture<?> scheduledFuture = ThreadPool.submit(vehicleThread);
ScheduledFuture<?> scheduledFuture = ScheduledThreadPool.submit(vehicleThread);
this.setScheduledFuture(scheduledFuture);
log.info("初始化车辆上报模拟线程开始:[{}]", this.getVin());
}

View File

@ -4,18 +4,21 @@ import com.dtflys.forest.annotation.BaseRequest;
import com.dtflys.forest.annotation.JSONBody;
import com.dtflys.forest.annotation.Post;
import com.muyu.common.Result;
import com.muyu.domain.model.MqttServerModel;
import com.muyu.vehicle.api.req.VehicleConnectionReq;
/**
* @author DongZl
* @author Saisai Liu
* @description:
* @Date 2023-11-28 10:20
*/
@BaseRequest(
baseURL = "${adminHost}"
)
@BaseRequest(baseURL = "${adminHost}")
public interface ClientAdmin {
@Post("${adminTopicUri}")
public Result<String> vehicleConnection(@JSONBody VehicleConnectionReq vehicleConnectionReq);
public Result<MqttServerModel> vehicleConnection(@JSONBody VehicleConnectionReq vehicleConnectionReq);
//
// @Post("/fluxmq/getIp")
// public String getIp();
}

View File

@ -70,4 +70,12 @@ public class LocalContainer {
public static List<VehicleInstance> getOfflineVehicleInstance(){
return vehicleDataMap.values().stream().filter(vehicleInstance -> !vehicleInstance.isOnline()).toList();
}
/**
* VIN
* @param vin VIN
*/
public static void removeByVin(String vin) {
vehicleDataMap.remove(vin);
}
}

View File

@ -1,14 +1,12 @@
package com.muyu.vehicle.core;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.muyu.common.ThreadPool;
import com.muyu.common.pool.FixedThreadPool;
import com.muyu.common.pool.ScheduledThreadPool;
import com.muyu.domain.Vehicle;
import com.muyu.service.VehicleInstanceService;
import com.muyu.service.VehicleService;
import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.model.VehicleData;
import com.muyu.vehicle.model.properties.MqttProperties;
import com.muyu.vehicle.thread.VehicleThread;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.ApplicationArguments;
@ -16,12 +14,10 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description
* @date 2023/11/9
@ -58,7 +54,7 @@ public class VehicleConfiguration implements ApplicationRunner {
public void run (ApplicationArguments args) {
this.vehiclePageInit();
// 提交给线程池 一分钟 执行一次
ThreadPool.submit(new Thread(vehicleService::syncDb), 30);
// ThreadPool.submit(new Thread(vehicleService::syncDb), 30);
}
/**
@ -76,6 +72,8 @@ public class VehicleConfiguration implements ApplicationRunner {
onlineVehicleInstanceList.forEach(VehicleInstance::closeClient);
log.info("关闭线程池");
ThreadPool.shutdown();
ScheduledThreadPool.shutdown();
FixedThreadPool.shutDown();
}
}

View File

@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author DongZeLiang
* @author Saisai.Liu
* @version 1.0
* @description Mqtt
* @date 2023/11/8

View File

@ -59,9 +59,9 @@ mybatis-plus:
logging:
level:
com.muyu: DEBUG
com.muyu.service: INFO
com.muyu.mapper: INFO
com.muyu.vehicle: INFO
com.muyu.service: DEBUG
com.muyu.mapper: DEBUG
com.muyu.vehicle: DEBUG
root: INFO
org:
springframework:
@ -86,7 +86,9 @@ forest:
# 服务器配置
mqtt:
server:
host: tcp://fluxmq.muyu.icu:1883
host: tcp://39.98.45.160:1883
topic: test1
admin:
host: http://127.0.0.1:${server.port}
topic-uri: /verify/vehicleConnection
host: http://127.0.0.1:8081
topic-uri: /fluxmq/getIp

View File

@ -1,4 +1,4 @@
package com;
package com.mobai;
import com.muyu.VehicleSimulationApplication;
import com.muyu.common.Result;
@ -21,14 +21,15 @@ public class AdminTest {
@Test
public void vehicleConnTest(){
Result<String> result = clientAdmin.vehicleConnection(
VehicleConnectionReq.builder()
.vin("VIN1234567894")
.timestamp(String.valueOf(System.currentTimeMillis()))
.userName("156841600")
.nonce("134812")
.build()
);
// Result<String> result = clientAdmin.vehicleConnection(
// VehicleConnectionReq.builder()
// .vin("VIN1234567894")
// .timestamp(String.valueOf(System.currentTimeMillis()))
// .userName("156841600")
// .nonce("134812")
// .build()
// );
Result<Object> result = new Result<>();
System.out.println(result);
}
}