增加车辆数据同步数据库

程序关闭前,执行数据库同步、在线车辆下下、关闭线程池操作
new-master
DongZeLiang 2023-12-02 22:47:15 +08:00
parent 6ad8afdda3
commit 3e6f723fe2
7 changed files with 104 additions and 25 deletions

View File

@ -19,7 +19,12 @@ public class ThreadPool {
public static ScheduledFuture<?> submit (Runnable thread){ public static ScheduledFuture<?> submit (Runnable thread){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位 // 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
return scheduledThreadPool.scheduleAtFixedRate(thread, 0, 1, TimeUnit.SECONDS); return submit(thread, 1);
}
public static ScheduledFuture<?> submit (Runnable thread, long period){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
return scheduledThreadPool.scheduleAtFixedRate(thread, 0, period, TimeUnit.SECONDS);
} }
/** /**

View File

@ -4,6 +4,8 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.muyu.utils.VehicleUtils; import com.muyu.utils.VehicleUtils;
import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.model.VehicleData;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -78,4 +80,13 @@ public class Vehicle {
.build(); .build();
} }
public static Vehicle instanceBuild (VehicleInstance vehicleInstance) {
VehicleData vehicle = vehicleInstance.getVehicleData();
return Vehicle.builder()
.vin(vehicleInstance.getVin())
.remainingBattery(vehicle.getRemainingBattery())
.totalMileage(vehicle.getMileage())
.build();
}
} }

View File

@ -24,4 +24,9 @@ public interface VehicleService extends IService<Vehicle> {
* @param vinStr VIN * @param vinStr VIN
*/ */
void create (String vinStr); void create (String vinStr);
/**
*
*/
void syncDb();
} }

View File

@ -1,19 +1,23 @@
package com.muyu.service.impl; package com.muyu.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.mapper.VehicleMapper; import com.muyu.mapper.VehicleMapper;
import com.muyu.service.VehicleInstanceService; import com.muyu.service.VehicleInstanceService;
import com.muyu.service.VehicleService; import com.muyu.service.VehicleService;
import com.muyu.utils.VehicleUtils; import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.core.LocalContainer;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@ -39,7 +43,7 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
*/ */
@Override @Override
@Transactional @Transactional
public void generate(Integer sum) { public void generate (Integer sum) {
List<Vehicle> vehicleList = Stream.generate(Vehicle::gen).limit(sum).toList(); List<Vehicle> vehicleList = Stream.generate(Vehicle::gen).limit(sum).toList();
this.saveBatch(vehicleList); this.saveBatch(vehicleList);
vehicleList.forEach(vehicleInstanceService::init); vehicleList.forEach(vehicleInstanceService::init);
@ -56,18 +60,18 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
String[] vinList = vinStr.split("\n"); String[] vinList = vinStr.split("\n");
StringBuilder errorMsg = new StringBuilder(); StringBuilder errorMsg = new StringBuilder();
for (String vin : vinList) { for (String vin : vinList) {
if (vin.length() != 17){ if (vin.length() != 17) {
errorMsg.append("vin[").append(vin).append("]").append("不为17位\n"); errorMsg.append("vin[").append(vin).append("]").append("不为17位\n");
}else { } else {
LambdaQueryWrapper<Vehicle> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<Vehicle> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(Vehicle::getVin, vin); queryWrapper.eq(Vehicle::getVin, vin);
long count = this.count(queryWrapper); long count = this.count(queryWrapper);
if (count == 1){ if (count == 1) {
errorMsg.append("vin[").append(vin).append("]").append("已经存在\n"); errorMsg.append("vin[").append(vin).append("]").append("已经存在\n");
} }
} }
} }
if (errorMsg.length() != 0){ if (!errorMsg.isEmpty()) {
throw new RuntimeException(errorMsg.toString()); throw new RuntimeException(errorMsg.toString());
} }
List<Vehicle> vehicleList = Arrays.stream(vinList).map(Vehicle::create).toList(); List<Vehicle> vehicleList = Arrays.stream(vinList).map(Vehicle::create).toList();
@ -75,5 +79,40 @@ public class VechileServiceImpl extends ServiceImpl<VehicleMapper, Vehicle> impl
vehicleList.forEach(vehicleInstanceService::init); vehicleList.forEach(vehicleInstanceService::init);
} }
/**
*
*/
@Override
public void syncDb () {
try {
log.info("同步数据库开始");
long startTime = System.currentTimeMillis();
Collection<VehicleInstance> vehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
// 成功数量
AtomicInteger syncSuccessSum = new AtomicInteger();
List<Vehicle> vehicleList = vehicleInstanceList.stream()
.filter(VehicleInstance::isOnline)
.map(Vehicle::instanceBuild)
.toList();
vehicleList.forEach(vehicle -> {
LambdaUpdateWrapper<Vehicle> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.set(Vehicle::getRemainingBattery, vehicle.getRemainingBattery());
updateWrapper.set(Vehicle::getTotalMileage, vehicle.getTotalMileage());
updateWrapper.eq(Vehicle::getVin, vehicle.getVin());
boolean update = this.update(updateWrapper);
if (update){
syncSuccessSum.incrementAndGet();
log.debug("车辆:[{}] - 数据同步成功 - 电池容量:[{}毫安] 总公里数量:[{}KM]", vehicle.getVin(), vehicle.getRemainingBattery(), vehicle.getTotalMileage());
}{
log.error("车辆:[{}] - 数据同步失败 - 电池容量:[{}毫安] 总公里数量:[{}KM]", vehicle.getVin(), vehicle.getRemainingBattery(), vehicle.getTotalMileage());
}
});
log.info("同步数据库结束 - 耗时:[{}MS],同步量:[{}辆],成功:[{}辆],失败:[{}辆]",
System.currentTimeMillis() - startTime, vehicleList.size(),syncSuccessSum.get(), vehicleList.size() - syncSuccessSum.get());
}catch (Exception exception){
log.error("数据同步异常:{}", exception.getMessage(), exception);
}
}
} }

View File

@ -2,6 +2,7 @@ package com.muyu.vehicle.core;
import com.muyu.vehicle.VehicleInstance; import com.muyu.vehicle.VehicleInstance;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -48,4 +49,17 @@ public class LocalContainer {
public static long total () { public static long total () {
return vehicleDataMap.size(); return vehicleDataMap.size();
} }
public static Collection<VehicleInstance> getVehicleInstanceAll () {
return vehicleDataMap.values();
}
/**
* 线
* @return 线
*/
public static List<VehicleInstance> getOnlineVehicleInstance(){
return vehicleDataMap.values().stream().filter(VehicleInstance::isOnline).toList();
}
} }

View File

@ -1,18 +1,21 @@
package com.muyu.vehicle.core; package com.muyu.vehicle.core;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.muyu.common.ThreadPool;
import com.muyu.domain.Vehicle; import com.muyu.domain.Vehicle;
import com.muyu.service.VehicleInstanceService; import com.muyu.service.VehicleInstanceService;
import com.muyu.service.VehicleService; import com.muyu.service.VehicleService;
import com.muyu.vehicle.VehicleInstance; import com.muyu.vehicle.VehicleInstance;
import com.muyu.vehicle.model.VehicleData; import com.muyu.vehicle.model.VehicleData;
import com.muyu.vehicle.model.properties.MqttProperties; import com.muyu.vehicle.model.properties.MqttProperties;
import com.muyu.vehicle.thread.VehicleThread;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -32,23 +35,6 @@ public class VehicleConfiguration implements ApplicationRunner {
private final VehicleInstanceService vehicleInstanceService; private final VehicleInstanceService vehicleInstanceService;
/*public VehicleInstance init(Vehicle vehicle){
log.info("构建车辆对象: [{}]", vehicle.getVin());
VehicleInstance vehicleInstance = new VehicleInstance(
MqttProperties.builder()
.broker("tcp://fluxmq.muyu.icu:1883")
.topic("test")
.clientId(vehicle.getVin())
.build()
);
log.info("构建车辆实例: [{}]", vehicle.getVin());
vehicleInstance.setVehicle(vehicle);
vehicleInstance.setVehicleData(VehicleData.vehicleBuild(vehicle));
vehicleInstance.initCline();
log.info("构建车辆客户端: [{}]", vehicle.getVin());
return vehicleInstance;
}*/
/** /**
* *
*/ */
@ -71,5 +57,25 @@ public class VehicleConfiguration implements ApplicationRunner {
@Override @Override
public void run (ApplicationArguments args) { public void run (ApplicationArguments args) {
this.vehiclePageInit(); this.vehiclePageInit();
// 提交给线程池 一分钟 执行一次
ThreadPool.submit(new Thread(vehicleService::syncDb), 30);
}
/**
*
*/
@PreDestroy
public void destroy(){
log.info("数据库同步");
vehicleService.syncDb();
log.info("下线所有车辆");
List<VehicleInstance> onlineVehicleInstanceList = LocalContainer.getOnlineVehicleInstance();
onlineVehicleInstanceList.forEach(VehicleInstance::closeClient);
log.info("关闭线程池");
ThreadPool.shutdown();
} }
} }

View File

@ -33,7 +33,6 @@ public class VehicleThread implements Runnable {
this.vehicleInstance.sendMsg( this.vehicleInstance.sendMsg(
this.vehicleInstance.getVehicleData().getMsg() this.vehicleInstance.getVehicleData().getMsg()
); );
log.info(JSONObject.toJSONString(this.vehicleInstance.getVehicleData()));
}else { }else {
log.warn("车辆[{}]数据模拟:{}", this.vehicleInstance.getVin(), imitateResult); log.warn("车辆[{}]数据模拟:{}", this.vehicleInstance.getVin(), imitateResult);
} }