Compare commits

...

10 Commits

Author SHA1 Message Date
Saisai Liu 743e81025f 基础模板 2024-05-24 21:09:25 +08:00
Saisai Liu ffa909eece 基础模板 2024-05-24 21:08:19 +08:00
DongZeLiang 3ecc09bb50 更新车辆上线代码逻辑 2024-03-26 10:00:35 +08:00
DongZeLiang 5c2fbb91dd 一键上线 2023-12-26 16:37:17 +08:00
dongzeliang eb1937a27b 代码未完善 2023-12-06 18:37:19 +08:00
dongzeliang 01eb776215 业务结构拆解 2023-12-06 15:35:59 +08:00
dongzeliang 0cf9ccb431 增加任务执行逻辑 2023-12-05 15:25:55 +08:00
DongZeLiang ed61bb6973 测试代码 2023-12-05 15:10:42 +08:00
dongzeliang a633e1c0a9 增加任务执行逻辑 2023-12-05 09:43:31 +08:00
dongzeliang 36834b9bd5 车辆上报判断是否现成,否则会创建多余线程 2023-12-04 15:43:41 +08:00
27 changed files with 835 additions and 285 deletions

View File

@ -7,7 +7,7 @@
</list> </list>
</option> </option>
</component> </component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_17" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" /> <mapping directory="" vcs="Git" />
</component> </component>
</project> </project>

View File

@ -13,7 +13,6 @@
<maven.compiler.target>17</maven.compiler.target> <maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mybatisplus.version>3.5.1</mybatisplus.version> <mybatisplus.version>3.5.1</mybatisplus.version>
<freemaker.version>2.3.31</freemaker.version>
</properties> </properties>
<parent> <parent>

View File

@ -0,0 +1,34 @@
package com.muyu.common.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author DongZl
* @description: 线
* @Date 2023-12-5 01:51
*/
public class FixedThreadPool {
/**
* 线
*/
private final static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(15);
/**
* 线
* @param thread 线
*/
public static Future<?> submit(Thread thread){
return fixedThreadPool.submit(thread);
}
/**
* 线
*/
public static void shutDown(){
fixedThreadPool.shutdown();
}
}

View File

@ -1,4 +1,4 @@
package com.muyu.common; package com.muyu.common.pool;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -10,12 +10,13 @@ import java.util.concurrent.TimeUnit;
* @description: 线 * @description: 线
* @Date 2023-11-17 09:16 * @Date 2023-11-17 09:16
*/ */
public class ThreadPool { public class ScheduledThreadPool {
/** /**
* 线 CPU * 2 + 1 * 线 CPU * 2 + 1
*/ */
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 + 1); private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors() * 2 + 1);
public static ScheduledFuture<?> submit (Runnable thread){ public static ScheduledFuture<?> submit (Runnable thread){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位 // 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位

View File

@ -13,6 +13,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
@Log4j2 @Log4j2
@Controller @Controller
public class IndexController { public class IndexController {
@RequestMapping("/") @RequestMapping("/")
public String hello(){ public String hello(){
log.debug("重定向到 - /static/index 页面"); log.debug("重定向到 - /static/index 页面");

View File

@ -39,7 +39,7 @@ public class VehicleController {
} }
/** /**
* *
* @param sum * @param vehicleCreateAddReq
* @return * @return
*/ */
@PostMapping("/create") @PostMapping("/create")
@ -49,4 +49,14 @@ public class VehicleController {
} }
/**
*
* @param vin
* @return
*/
@DeleteMapping("/{vin}")
public Result<String> delete(@PathVariable("vin") String vin){
this.vehicleService.delete(vin);
return Result.success(null,"删除成功");
}
} }

View File

@ -6,6 +6,7 @@ import com.muyu.domain.req.CheckPositionReq;
import com.muyu.domain.req.GearReq; import com.muyu.domain.req.GearReq;
import com.muyu.domain.req.MsgReq; import com.muyu.domain.req.MsgReq;
import com.muyu.domain.req.VehicleInstanceListReq; import com.muyu.domain.req.VehicleInstanceListReq;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.domain.resp.VehicleInstanceResp; import com.muyu.domain.resp.VehicleInstanceResp;
import com.muyu.service.VehicleInstanceService; import com.muyu.service.VehicleInstanceService;
import com.muyu.vehicle.core.LocalContainer; import com.muyu.vehicle.core.LocalContainer;
@ -14,8 +15,6 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.List;
/** /**
* @author DongZl * @author DongZl
* @description: * @description:
@ -117,49 +116,4 @@ public class VehicleInstanceController {
this.vehicleInstanceService.editStatus(vin, statusKey, statusValue); this.vehicleInstanceService.editStatus(vin, statusKey, statusValue);
return Result.success(); return Result.success();
} }
/**
* 线
*/
@PostMapping("/unified/online")
public Result<String> unifiedOnline(){
this.vehicleInstanceService.unifiedOnline();
return Result.success(null,"已成功发布一键上线任务");
}
/**
* 线
*/
@PostMapping("/unified/offline")
public Result<String> unifiedOffline(){
this.vehicleInstanceService.unifiedOffline();
return Result.success(null,"已成功发布一键离线任务");
}
/**
*
*/
@PostMapping("/unified/send")
public Result<String> unifiedSend(){
this.vehicleInstanceService.unifiedSend();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/unified/position")
public Result<String> unifiedPosition(){
this.vehicleInstanceService.unifiedPosition();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/unified/stop")
public Result<String> unifiedStop(){
this.vehicleInstanceService.unifiedStop();
return Result.success(null,"已成功发布取消上报任务");
}
} }

View File

@ -0,0 +1,80 @@
package com.muyu.controller;
import com.muyu.common.Result;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.service.VehicleUnifiedService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/6
*/
@Log4j2
@RestController
@RequestMapping("/vehicle/unified")
public class VehicleUnifiedController {
@Autowired
private VehicleUnifiedService vehicleUnifiedService;
/**
* 线
*/
@PostMapping("/online")
public Result<String> unifiedOnline(){
this.vehicleUnifiedService.unifiedOnline();
return Result.success(null,"已成功发布一键上线任务");
}
/**
* 线
*/
@PostMapping("/offline")
public Result<String> unifiedOffline(){
this.vehicleUnifiedService.unifiedOffline();
return Result.success(null,"已成功发布一键离线任务");
}
/**
*
*/
@PostMapping("/send")
public Result<String> unifiedSend(){
this.vehicleUnifiedService.unifiedSend();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/position")
public Result<String> unifiedPosition(){
this.vehicleUnifiedService.unifiedPosition();
return Result.success(null,"已成功发布一键上报任务");
}
/**
*
*/
@PostMapping("/stop")
public Result<String> unifiedStop(){
this.vehicleUnifiedService.unifiedStop();
return Result.success(null,"已成功发布取消上报任务");
}
/**
*
* @return
*/
@GetMapping("/status")
public Result<UnifiedTaskResp> unifiedStatus(){
return Result.success(this.vehicleUnifiedService.unifiedStatus());
}
}

View File

@ -1,6 +1,8 @@
package com.muyu.controller; package com.muyu.controller;
import com.muyu.common.Result; import com.muyu.common.Result;
import com.muyu.domain.model.MqttServerModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@ -14,12 +16,25 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/verify") @RequestMapping("/verify")
public class VerifyController { public class VerifyController {
@Value("${mqtt.server.host}")
private String broker;
@Value("${mqtt.server.topic}")
private String topic;
/** /**
* 线 * 线
* @return test * @return test
*/ */
@PostMapping("/vehicleConnection") @PostMapping("/vehicleConnection")
public Result<String> vehicleConnection(){ public Result<MqttServerModel> vehicleConnection(){
return Result.success("test"); return Result.success(
MqttServerModel.builder()
.broker(broker)
.topic(topic)
.build()
);
} }
} }

View File

@ -0,0 +1,28 @@
package com.muyu.domain.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author DongZl
* @description: Mqtt
* @Date 2024-3-26 09:53
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttServerModel {
/**
* MQTT
*/
private String broker;
/**
* MQTT
*/
private String topic;
}

View File

@ -0,0 +1,121 @@
package com.muyu.domain.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/4
*/
@Data
@Log4j2
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskModel {
/**
* false
*/
private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.FALSE);
private CountDownLatch countDownLatch ;
/**
*
* @return
*/
public boolean isExecution(){
return !unifiedStatus.get();
}
/**
*
*/
private String taskName;
/**
*
*/
private Integer taskExecutionSum = 0;
/**
*
*/
private long taskStartTime;
/**
* 线
* @param taskName
* @param taskExecutionSum
*/
public void submit(String taskName,Integer taskExecutionSum){
if (!this.isExecution()){
throw new RuntimeException("["+this.taskName+"]的任务正在进行中,请等待任务执行完成再次发布一键任务");
}
unifiedStatus.set(Boolean.TRUE);
this.taskName = taskName;
this.countDownLatch = new CountDownLatch(taskExecutionSum);
this.taskExecutionSum = taskExecutionSum;
this.taskSuccessSum = new AtomicInteger();
this.taskErrorSum = new AtomicInteger();
this.taskStartTime = System.currentTimeMillis();
log.info("[{}]任务执行开始", this.taskName);
}
/**
*
*/
public void down(){
log.info("[{}]任务执行结束,耗时:[{}]MS", this.taskName, System.currentTimeMillis() - taskStartTime);
this.taskName = null;
this.taskExecutionSum = 0;
this.taskSuccessSum.set(0);
this.taskErrorSum.set(0);
unifiedStatus.set(Boolean.FALSE);
}
/**
*
*/
private AtomicInteger taskSuccessSum = new AtomicInteger();
/**
*
*/
public void incrementSuccess(){
this.taskSuccessSum.incrementAndGet();
}
/**
*
*/
public int getSuccessSum(){
return this.taskSuccessSum.incrementAndGet();
}
/**
*
*/
private AtomicInteger taskErrorSum = new AtomicInteger();
/**
*
*/
public void incrementError(){
this.taskErrorSum.incrementAndGet();
}
/**
*
*/
public int getErrorSum(){
return this.taskErrorSum.incrementAndGet();
}
}

View File

@ -0,0 +1,50 @@
package com.muyu.domain.resp;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/5
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UnifiedTaskResp {
/**
* false
*/
private boolean unifiedStatus;
/**
*
*/
private String taskName;
/**
*
*/
private Integer taskExecutionSum;
/**
*
*/
private long taskStartTime;
/**
*
*/
private Integer taskSuccessSum;
/**
*
*/
private Integer taskErrorSum;
}

View File

@ -6,6 +6,7 @@ import com.muyu.domain.req.CheckPositionReq;
import com.muyu.domain.req.GearReq; import com.muyu.domain.req.GearReq;
import com.muyu.domain.req.MsgReq; import com.muyu.domain.req.MsgReq;
import com.muyu.domain.req.VehicleInstanceListReq; import com.muyu.domain.req.VehicleInstanceListReq;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.domain.resp.VehicleInstanceResp; import com.muyu.domain.resp.VehicleInstanceResp;
/** /**
@ -70,30 +71,4 @@ public interface VehicleInstanceService {
*/ */
void editStatus (String vin, String statusKey, Integer statusValue); void editStatus (String vin, String statusKey, Integer statusValue);
/**
* 线
*/
public void unifiedOnline();
/**
* 线
*/
public void unifiedOffline();
/**
*
*/
public void unifiedSend();
/**
*
*/
void unifiedPosition ();
/**
*
*/
public void unifiedStop();
} }

View File

@ -29,4 +29,10 @@ public interface VehicleService extends IService<Vehicle> {
* *
*/ */
void syncDb(); void syncDb();
/**
* VIN
* @param vin vin
*/
void delete(String vin);
} }

View File

@ -0,0 +1,45 @@
package com.muyu.service;
import com.muyu.domain.resp.UnifiedTaskResp;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/6
*/
public interface VehicleUnifiedService {
/**
* 线
*/
public void unifiedOnline();
/**
* 线
*/
public void unifiedOffline();
/**
*
*/
public void unifiedSend();
/**
*
*/
void unifiedPosition ();
/**
*
*/
public void unifiedStop();
/**
*
* @return
*/
UnifiedTaskResp unifiedStatus();
}

View File

@ -85,6 +85,7 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
@Override @Override
public void syncDb () { public void syncDb () {
try { try {
// vehicleInstanceService.isTaskStatus();
log.info("同步数据库开始"); log.info("同步数据库开始");
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
Collection<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance(); Collection<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
@ -114,5 +115,28 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
} }
} }
/**
* VIN
*
* @param vin vin
*/
@Override
public void delete(String vin) {
VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin);
// 先判断车辆是否在上报数据,上报则停止
if (vehicleInstance.isSend()){
vehicleInstance.stopSend();
}
// 判断车辆是否在线,若在线则让车辆下线
if (vehicleInstance.isOnline()){
vehicleInstance.closeClient();
}
// 进行缓存删除
LocalContainer.removeByVin(vin);
// 删除数据库
this.removeById(vin);
}
} }

View File

@ -1,10 +1,9 @@
package com.muyu.service.impl; package com.muyu.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.muyu.common.PageList; import com.muyu.common.PageList;
import com.muyu.common.Result; import com.muyu.common.Result;
import com.muyu.domain.PositionRouteInfo;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.domain.model.MqttServerModel;
import com.muyu.domain.model.PositionModel; import com.muyu.domain.model.PositionModel;
import com.muyu.domain.req.CheckPositionReq; import com.muyu.domain.req.CheckPositionReq;
import com.muyu.domain.req.GearReq; import com.muyu.domain.req.GearReq;
@ -24,14 +23,11 @@ import com.muyu.vehicle.model.properties.MqttProperties;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@ -50,9 +46,6 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
@Autowired @Autowired
private ClientAdmin clientAdmin; private ClientAdmin clientAdmin;
@Value("${mqtt.server.host}")
private String broker;
/** /**
* *
* *
@ -106,6 +99,7 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
*/ */
@Override @Override
public void vehicleClientInit (String vin) { public void vehicleClientInit (String vin) {
log.info("vin[{}],开始上线", vin);
VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin); VehicleInstance vehicleInstance = LocalContainer.getVehicleInstance(vin);
if (vehicleInstance == null){ if (vehicleInstance == null){
throw new RuntimeException("没有【"+vin+"】车辆"); throw new RuntimeException("没有【"+vin+"】车辆");
@ -117,19 +111,23 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
.userName(MD5Util.encrypted(vin+timestamp)) .userName(MD5Util.encrypted(vin+timestamp))
.nonce(MD5Util.encrypted(UUID.randomUUID().toString().replace("-", ""))) .nonce(MD5Util.encrypted(UUID.randomUUID().toString().replace("-", "")))
.build(); .build();
Result<String> result = clientAdmin.vehicleConnection(connectionReq); Result<MqttServerModel> result = clientAdmin.vehicleConnection(connectionReq);
if (result.getCode() != 200){ if (result.getCode() != 200){
throw new RuntimeException("车辆:["+vin+"],连接异常:["+result.getMsg()+"]"); log.error("车辆:[{}],申请上线异常:[{}]", vin, result.getMsg());
throw new RuntimeException("远程服务器没有【"+vin+"】车辆");
} }
MqttServerModel mqttServerModel = result.getData();
MqttProperties mqttProperties = MqttProperties.builder() MqttProperties mqttProperties = MqttProperties.builder()
.broker(broker) .broker(mqttServerModel.getBroker())
.topic(result.getData()) .topic(mqttServerModel.getTopic())
.clientId(vin) .clientId(vin)
.username(connectionReq.getUserName()) .username(connectionReq.getUserName())
.password(vin+connectionReq.getTimestamp()+connectionReq.getNonce()) .password(vin + connectionReq.getTimestamp() + connectionReq.getNonce())
.build(); .build();
vehicleInstance.setMqttProperties(mqttProperties); vehicleInstance.setMqttProperties(mqttProperties);
vehicleInstance.initClient(); vehicleInstance.initClient();
log.info("vin[{}],上线成功", vin);
} }
/** /**
@ -203,166 +201,6 @@ public class VehicleInstanceServiceImpl implements VehicleInstanceService {
ReflectUtils.invokeSetter(vehicleData, statusKey, statusValue); ReflectUtils.invokeSetter(vehicleData, statusKey, statusValue);
} }
private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.TRUE); // private final AtomicBoolean unifiedStatus = new AtomicBoolean(Boolean.TRUE);
/**
* 线
*/
@Override
public void unifiedOnline () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
try {
unifiedStatus.set(Boolean.FALSE);
try {
// 筛选出离线车辆并使用并行流进行上线操作
LocalContainer.getOfflineVehicleInstance()
.stream()
.parallel()
.map(VehicleInstance::getVin)
.forEach(this::vehicleClientInit);
}catch (Exception exception){
log.error("车辆一件上线报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}catch (Exception exception){
}
}).start();
}
/**
* 线
*/
@Override
public void unifiedOffline () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
unifiedStatus.set(Boolean.FALSE);
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
});
}catch (Exception exception){
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}).start();
}
/**
*
*/
@Override
public void unifiedSend () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆上线,请先让车辆上线");
}
new Thread(() -> {
try {
unifiedStatus.set(Boolean.FALSE);
// 先一键重置路径
this.unifiedPositionPri ();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if(vehicleInstance.getVehicleThread() == null){
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
});
}catch (Exception exception){
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}).start();
}
/**
*
*/
@Override
public void unifiedPosition () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
unifiedStatus.set(Boolean.FALSE);
try {
this.unifiedPositionPri();
}catch (Exception exception){
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
unifiedStatus.set(Boolean.TRUE);
}).start();
}
private void unifiedPositionPri(){
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
});
}
/**
*
*/
@Override
public void unifiedStop () {
if (!unifiedStatus.get()){
throw new RuntimeException("一键执行的任务正在进行中,请勿再次发布一键执行任务");
}
new Thread(() -> {
try {
unifiedStatus.set(Boolean.FALSE);
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(VehicleInstance::stopSend);
unifiedStatus.set(Boolean.TRUE);
}catch (Exception exception){
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
}).start();
}
} }

View File

@ -0,0 +1,280 @@
package com.muyu.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.pool.FixedThreadPool;
import com.muyu.domain.PositionRouteInfo;
import com.muyu.domain.model.PositionModel;
import com.muyu.domain.model.TaskModel;
import com.muyu.domain.resp.UnifiedTaskResp;
import com.muyu.service.PositionRouteService;
import com.muyu.service.VehicleInstanceService;
import com.muyu.service.VehicleUnifiedService;
import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.core.LocalContainer;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/12/6
*/
@Log4j2
@Service
public class VehicleUnifiedServiceImpl implements VehicleUnifiedService {
@Autowired
private VehicleInstanceService vehicleInstanceService;
@Autowired
private PositionRouteService positionRouteService;
/**
*
*/
private final TaskModel taskModel = new TaskModel();
/**
* 线
*/
@Override
public void unifiedOnline () {
// 获取离线车辆VIN
List<String> vinList = LocalContainer.getOfflineVehicleInstance()
.stream()
.map(VehicleInstance::getVin)
.toList();
taskModel.submit("一键上线", vinList.size());
new Thread(() -> {
int vinSize = 0, executionSize = 15;
do {
int startIndex = vinSize++ * executionSize;
// 进行分页开启车辆
List<String> executionVinList = vinList.stream()
.skip(startIndex)
.limit(executionSize)
.toList();
CountDownLatch countDownLatch = new CountDownLatch(executionVinList.size());
Map<String, Thread> startVehicleThread = new ConcurrentHashMap<>();
executionVinList.forEach(vin -> {
Thread thread = new Thread(() -> {
try {
vehicleInstanceService.vehicleClientInit(vin);
startVehicleThread.remove(vin);
}catch (Exception interruptedException){
log.error(interruptedException);
}
});
startVehicleThread.put(vin, thread);
FixedThreadPool.submit(thread);
});
try {
boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
log.info(
"开始:[{}] 结束:[{}],未上线成功:[{}], vin[{}]",
startIndex, startIndex+executionVinList.size(),
startVehicleThread.size(),
JSONObject.toJSONString(executionVinList));
if (!await){
startVehicleThread.values().forEach(Thread::interrupt);
}
} catch (InterruptedException ignored) {}
if (executionVinList.size() < executionSize){
break;
}
}while (true);
taskModel.down();
}).start();
}
/**
* 线
*/
@Override
public void unifiedOffline () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
new Thread(() -> {
try {
// 筛选出在线车辆使用并行流操作先停止车辆上报动作再进行车辆离线操作
onlineVehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
vehicleInstance.closeClient();
taskModel.incrementSuccess();
}catch (Exception exception){
log.error("车辆离线异常:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键离线报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键离线", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedSend () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
if (vehicleInstanceList.isEmpty()){
throw new RuntimeException("还没有车辆连接到服务器,请先让车辆上线");
}
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
Thread taskThread = new Thread(() -> {
try {
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
// 设置车辆档位
vehicleInstance.setGear("D");
// 开启线程进行上报
if (vehicleInstance.getVehicleThread() == null) {
vehicleInstance.initVehicleThread();
}
vehicleInstance.startSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆设置一键上报失败:{}", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键上报报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键上报", vehicleInstanceList.size(),taskThread);
}
/**
*
*/
@Override
public void unifiedPosition () {
List<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
// 获取到所有路径
List<PositionRouteInfo> positionRouteInfoList = positionRouteService.list();
// 路径长度
int positionSize = positionRouteInfoList.size();
// 随机数
Random random = new Random();
vehicleInstanceList
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
// 随机一个路径结果
int positionIndex = random.nextInt(0, positionSize);
PositionRouteInfo positionRouteInfo = positionRouteInfoList.get(positionIndex);
String positionCode = positionRouteInfo.getName();
List<PositionModel> positionModelList = JSONArray.parseArray(positionRouteInfo.getRouteData(), String.class)
.stream()
.map(PositionModel::strBuild)
.toList();
// 设置车辆路径
vehicleInstance.settingPosition(positionModelList);
vehicleInstance.setPositionCode(positionCode);
taskModel.incrementSuccess();
} catch (Exception exception) {
log.error("车辆设置路线异常:[{}]", exception.getMessage());
taskModel.incrementError();
}
});
} catch (Exception exception) {
log.error("车辆一键重置路径报错:{}", exception.getMessage(), exception);
}
taskModel.down();
});
// taskModel.submit("一键重置路径",vehicleInstanceList.size(), taskThread);
}
/**
*
*/
@Override
public void unifiedStop () {
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
Thread taskThread = new Thread(() -> {
try {
LocalContainer.getOnlineVehicleInstance()
.stream()
.parallel()
.forEach(vehicleInstance -> {
try {
vehicleInstance.stopSend();
taskModel.incrementSuccess();
}catch (Exception exception){
log.info("车辆一键取消上报发生错误:{}", exception.getMessage());
taskModel.incrementError();
}
});
taskModel.down();
} catch (Exception exception) {
log.error("车辆一键取消上报报错:{}", exception.getMessage(), exception);
}
});
// taskModel.submit("一键取消上报", onlineVehicleInstanceList.size(), taskThread);
}
/**
*
*
* @return
*/
@Override
public UnifiedTaskResp unifiedStatus() {
boolean unifiedStatus = this.taskModel.getUnifiedStatus().get();
return UnifiedTaskResp.builder()
.unifiedStatus(unifiedStatus)
.taskErrorSum(this.taskModel.getErrorSum())
.taskExecutionSum(this.taskModel.getTaskExecutionSum())
.taskSuccessSum(this.taskModel.getSuccessSum())
.taskName(this.taskModel.getTaskName())
.taskStartTime(System.currentTimeMillis() - this.taskModel.getTaskStartTime())
.build();
}
}

View File

@ -1,5 +1,7 @@
package com.muyu.utils; package com.muyu.utils;
import java.nio.charset.StandardCharsets;
public class ConversionUtil { public class ConversionUtil {
/** /**
@ -17,4 +19,47 @@ public class ConversionUtil {
} }
return sb.toString(); return sb.toString();
} }
public static void main (String[] args) {
// String str = "<?xml version=\"1.0\"?>\n" +
// "<monitorRoot type=\"param\"><synchronizeSyptom event=\"0\" initial=\"true\"><Action_ECG><Rhythm>Sinus</Rhythm><HR>80</HR><EMD>No Change</EMD><Conduct>0</Conduct></Action_ECG><Action_Osat value=\"94\" isRelativePercent=\"false\"/><Action_BP isRelativePercent=\"false\"><Shrink value=\"120\"/><Stretch value=\"80\"/></Action_BP><Action_Resp breathType=\"Normal\" value=\"14\" isRelativePercent=\"false\"/><Action_etCO2 value=\"34\" isRelativePercent=\"false\"/><Action_Temperature value=\"35.2\"/><Action_CVP value=\"6.0\"/><Action_PAPDia value=\"10\"/><Action_PAPSys value=\"25\"/><Action_WP value=\"9\"/></synchronizeSyptom></monitorRoot>";
// String strToSixteen = strToSixteen(str);
// System.out.println(str);
// System.out.println(str.length());
// System.out.println(strToSixteen);
// System.out.println(strToSixteen.replace(" ", "").length());
String hexStr = "3C3F786D6C2076657273696F6E3D22312E30223F3E0D0A3C6D6F6E69746F72526F6F7420747970653D22706172616D223E3C73796E6368726F6E697A65537970746F6D206576656E743D22302220696E697469616C3D2274727565223E3C416374696F6E5F4543473E3C52687974686D3E53696E75733C2F52687974686D3E3C48523E38303C2F48523E3C454D443E4E6F204368616E67653C2F454D443E3C436F6E647563743E303C2F436F6E647563743E3C2F416374696F6E5F4543473E3C416374696F6E5F4F7361742076616C75653D2239342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F425020697352656C617469766550657263656E743D2266616C7365223E3C536872696E6B2076616C75653D22313230222F3E3C537472657463682076616C75653D223830222F3E3C2F416374696F6E5F42503E3C416374696F6E5F5265737020627265617468547970653D224E6F726D616C222076616C75653D2231342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F6574434F322076616C75653D2233342220697352656C617469766550657263656E743D2266616C7365222F3E3C416374696F6E5F54656D70657261747572652076616C75653D2233352E32222F3E3C416374696F6E5F4356502076616C75653D22362E30222F3E3C416374696F6E5F5041504469612076616C75653D223130222F3E3C416374696F6E5F5041505379732076616C75653D223235222F3E3C416374696F6E5F57502076616C75653D2239222F3E3C2F73796E6368726F6E697A65537970746F6D3E3C2F6D6F6E69746F72526F6F743E0D0A";
String hexStringToString = hexStringToString(hexStr);
System.out.println(hexStr);
System.out.println(hexStr.length());
System.out.println(hexStringToString);
System.out.println(hexStringToString.length());
}
/**
* 16string
* @param s
* @return
*/
public static String hexStringToString(String s) {
if (s == null || s.equals("")) {
return null;
}
s = s.replace(" ", "");
byte[] baKeyword = new byte[s.length() / 2];
for (int i = 0; i < baKeyword.length; i++) {
try {
baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
} catch (Exception e) {
e.printStackTrace();
}
}
try {
s = new String(baKeyword, StandardCharsets.UTF_8);
} catch (Exception e1) {
e1.printStackTrace();
}
return s;
}
} }

View File

@ -0,0 +1,30 @@
1、收集系统 - MQTT接受数据
1.1上报kafka
2、主流程
2.1、消费者按照分区主动拉取车辆报文
2.2、解析车辆报文,把报文信息转换为实体对象
2.3、解析系统 - 业务拆分(拆解主、从流程)
2.4、IOTDB搭建
2.5、封装IOTDB插件
2.6、解析系统存储到IOTDB当中
2.7、封装三级缓存插件、并保证数据一致性
3、解析系统执行从流程规范与规则
3.1、执行车辆实时数据 - 业务预留
3.2、执行车辆故障报警 - 业务预留
3.3、执行车辆电子围栏 - 业务预留
4、车辆管理:
4.1、车辆上线、下线
4.2、车辆标签
4.3、电子围栏
4.4、电子围栏 - 新增
4.5、电子围栏 - 回显/修改
4.6、电子围栏 - 列表/查看
4.7、电子围栏 - 删除
4.8、车辆电子围栏预警业务
4.9、车辆故障维护
4.10、车辆故障解析
4.11、搭建MQ消息系统
4.12、车辆故障记录
https://gitea.qinmian.online/Mobai/NetworkingCar.git

View File

@ -2,7 +2,7 @@ package com.muyu.vehicle;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.SystemConstant; import com.muyu.common.SystemConstant;
import com.muyu.common.ThreadPool; import com.muyu.common.pool.ScheduledThreadPool;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.domain.model.PositionModel; import com.muyu.domain.model.PositionModel;
import com.muyu.utils.CalculateCheckDigit; import com.muyu.utils.CalculateCheckDigit;
@ -89,9 +89,6 @@ public class VehicleInstance {
*/ */
private MqttProperties mqttProperties; private MqttProperties mqttProperties;
public VehicleInstance(MqttProperties mqttProperties) {
this.mqttProperties = mqttProperties;
}
/*** /***
* VIN * VIN
@ -136,8 +133,8 @@ public class VehicleInstance {
options.setPassword(mqttProperties.getPassword().toCharArray()); options.setPassword(mqttProperties.getPassword().toCharArray());
} }
options.setConnectionTimeout(60); options.setConnectionTimeout(1);
options.setKeepAliveInterval(60); options.setKeepAliveInterval(20);
// 连接 // 连接
client.connect(options); client.connect(options);
log.debug("车辆:[{}] 客户端初始化成功连接配置:{}", getVin(), log.debug("车辆:[{}] 客户端初始化成功连接配置:{}", getVin(),
@ -158,6 +155,14 @@ public class VehicleInstance {
return this.client.isConnected(); return this.client.isConnected();
} }
/**
* 线
* @return truefalse
*/
public boolean isSend(){
return this.vehicleThread != null;
}
/** /**
* *
*/ */
@ -195,7 +200,7 @@ public class VehicleInstance {
VehicleThread vehicleThread = new VehicleThread(); VehicleThread vehicleThread = new VehicleThread();
vehicleThread.setVehicleInstance(this); vehicleThread.setVehicleInstance(this);
this.setVehicleThread(vehicleThread); this.setVehicleThread(vehicleThread);
ScheduledFuture<?> scheduledFuture = ThreadPool.submit(vehicleThread); ScheduledFuture<?> scheduledFuture = ScheduledThreadPool.submit(vehicleThread);
this.setScheduledFuture(scheduledFuture); this.setScheduledFuture(scheduledFuture);
log.info("初始化车辆上报模拟线程开始:[{}]", this.getVin()); log.info("初始化车辆上报模拟线程开始:[{}]", this.getVin());
} }

View File

@ -4,6 +4,7 @@ import com.dtflys.forest.annotation.BaseRequest;
import com.dtflys.forest.annotation.JSONBody; import com.dtflys.forest.annotation.JSONBody;
import com.dtflys.forest.annotation.Post; import com.dtflys.forest.annotation.Post;
import com.muyu.common.Result; import com.muyu.common.Result;
import com.muyu.domain.model.MqttServerModel;
import com.muyu.vehicle.api.req.VehicleConnectionReq; import com.muyu.vehicle.api.req.VehicleConnectionReq;
/** /**
@ -17,5 +18,5 @@ import com.muyu.vehicle.api.req.VehicleConnectionReq;
public interface ClientAdmin { public interface ClientAdmin {
@Post("${adminTopicUri}") @Post("${adminTopicUri}")
public Result<String> vehicleConnection(@JSONBody VehicleConnectionReq vehicleConnectionReq); public Result<MqttServerModel> vehicleConnection(@JSONBody VehicleConnectionReq vehicleConnectionReq);
} }

View File

@ -70,4 +70,12 @@ public class LocalContainer {
public static List<VehicleInstance> getOfflineVehicleInstance(){ public static List<VehicleInstance> getOfflineVehicleInstance(){
return vehicleDataMap.values().stream().filter(vehicleInstance -> !vehicleInstance.isOnline()).toList(); return vehicleDataMap.values().stream().filter(vehicleInstance -> !vehicleInstance.isOnline()).toList();
} }
/**
* VIN
* @param vin VIN
*/
public static void removeByVin(String vin) {
vehicleDataMap.remove(vin);
}
} }

View File

@ -1,14 +1,12 @@
package com.muyu.vehicle.core; package com.muyu.vehicle.core;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.muyu.common.ThreadPool; import com.muyu.common.pool.FixedThreadPool;
import com.muyu.common.pool.ScheduledThreadPool;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.service.VehicleInstanceService; import com.muyu.service.VehicleInstanceService;
import com.muyu.service.VehicleService; import com.muyu.service.VehicleService;
import com.muyu.vehicle.VehicleInstance; import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.model.VehicleData;
import com.muyu.vehicle.model.properties.MqttProperties;
import com.muyu.vehicle.thread.VehicleThread;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@ -16,8 +14,6 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List; import java.util.List;
/** /**
@ -58,7 +54,7 @@ public class VehicleConfiguration implements ApplicationRunner {
public void run (ApplicationArguments args) { public void run (ApplicationArguments args) {
this.vehiclePageInit(); this.vehiclePageInit();
// 提交给线程池 一分钟 执行一次 // 提交给线程池 一分钟 执行一次
ThreadPool.submit(new Thread(vehicleService::syncDb), 30); // ThreadPool.submit(new Thread(vehicleService::syncDb), 30);
} }
/** /**
@ -76,6 +72,8 @@ public class VehicleConfiguration implements ApplicationRunner {
onlineVehicleInstanceList.forEach(VehicleInstance::closeClient); onlineVehicleInstanceList.forEach(VehicleInstance::closeClient);
log.info("关闭线程池"); log.info("关闭线程池");
ThreadPool.shutdown(); ScheduledThreadPool.shutdown();
FixedThreadPool.shutDown();
} }
} }

View File

@ -59,9 +59,9 @@ mybatis-plus:
logging: logging:
level: level:
com.muyu: DEBUG com.muyu: DEBUG
com.muyu.service: INFO com.muyu.service: DEBUG
com.muyu.mapper: INFO com.muyu.mapper: DEBUG
com.muyu.vehicle: INFO com.muyu.vehicle: DEBUG
root: INFO root: INFO
org: org:
springframework: springframework:
@ -86,7 +86,8 @@ forest:
# 服务器配置 # 服务器配置
mqtt: mqtt:
server: server:
host: tcp://fluxmq.muyu.icu:1883 host: tcp://39.98.45.160:1883
topic: test1
admin: admin:
host: http://127.0.0.1:${server.port} host: http://127.0.0.1:${server.port}
topic-uri: /verify/vehicleConnection topic-uri: /verify/vehicleConnection

View File

@ -1,4 +1,4 @@
package com; package com.mobai;
import com.muyu.VehicleSimulationApplication; import com.muyu.VehicleSimulationApplication;
import com.muyu.common.Result; import com.muyu.common.Result;
@ -21,14 +21,15 @@ public class AdminTest {
@Test @Test
public void vehicleConnTest(){ public void vehicleConnTest(){
Result<String> result = clientAdmin.vehicleConnection( // Result<String> result = clientAdmin.vehicleConnection(
VehicleConnectionReq.builder() // VehicleConnectionReq.builder()
.vin("VIN1234567894") // .vin("VIN1234567894")
.timestamp(String.valueOf(System.currentTimeMillis())) // .timestamp(String.valueOf(System.currentTimeMillis()))
.userName("156841600") // .userName("156841600")
.nonce("134812") // .nonce("134812")
.build() // .build()
); // );
Result<Object> result = new Result<>();
System.out.println(result); System.out.println(result);
} }
} }