多线程模拟上报完善

master
dongzeliang 2023-11-17 16:45:59 +08:00
parent 1aafbe78b9
commit 2473b1c944
8 changed files with 280 additions and 129 deletions

View File

@ -1,9 +1,8 @@
package com.muyu.common;
import com.muyu.vehicle.MyThread;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@ -18,8 +17,15 @@ public class ThreadPool {
*/
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
public static void submit (Runnable thread){
public static ScheduledFuture<?> submit (Runnable thread){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
scheduledThreadPool.scheduleAtFixedRate(thread, 0,1, TimeUnit.SECONDS);
return scheduledThreadPool.scheduleAtFixedRate(thread, 0, 1, TimeUnit.SECONDS);
}
/**
* 线
*/
public static void shutdown() {
scheduledThreadPool.shutdown();
}
}

View File

@ -1,37 +0,0 @@
package com.muyu.vehicle;
public class MyThread implements Runnable {
/**
*
*/
private volatile boolean isPaused;
@Override
public void run() {
if (!isPaused){
System.out.println(System.currentTimeMillis());
}else {
System.out.println("线程暂停");
}
}
/**
* 线
*/
public void pause() {
isPaused = true;
}
/**
* 线
*/
public void resume() {
isPaused = false;
synchronized (this) {
notify(); // 唤醒线程
}
}
}

View File

@ -1,6 +1,13 @@
package com.muyu.vehicle;
import com.muyu.common.ThreadPool;
import com.muyu.domain.Vehicle;
import com.muyu.vehicle.core.LocalContainer;
import com.muyu.vehicle.model.VehicleData;
import java.math.BigDecimal;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import static java.lang.Thread.sleep;
@ -11,47 +18,85 @@ import static java.lang.Thread.sleep;
*/
public class Test {
public static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
String vin1 = "VIN123456789123456", vin2 = "VIN123456789166666";
init(vin1);
init(vin2);
new Thread(new TestThread(vin1)).start();
new Thread(new TestThread(vin2)).start();
countDownLatch.await();
ThreadPool.shutdown();
}
MyThread thread = new MyThread();
ThreadPool.submit(thread);
new Thread(() -> {
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
thread.pause();
}).start();
public static void init(String vin){
Vehicle vehicle = Vehicle.builder()
.vin(vin)
.createTime(new Date())
.remainingBattery(new BigDecimal("45000"))
.batteryLevel(new BigDecimal("50000"))
.build();
/*new Thread(() -> {
try {
sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("执行了五秒,等待五秒,在开始");
try {
synchronized (thread){
thread.wait();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("等待了五秒现在开始3秒后杀死");
synchronized (thread){
thread.notify();
}
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (thread){
thread.interrupt();
}
System.out.println("停止成功" + thread.isAlive());
}).start();*/
VehicleInstance vehicleInstance = new VehicleInstance();
vehicleInstance.setVehicle(vehicle);
vehicleInstance.setVehicleData(VehicleData.vehicleBuild(vehicle));
LocalContainer.setVehicleInstance(vehicleInstance);
}
}
class TestThread implements Runnable{
private String vin ;
public TestThread(String vin) {
this.vin = vin;
}
/**
* When an object implementing interface {@code Runnable} is used
* to create a thread, starting the thread causes the object's
* {@code run} method to be called in that separately executing
* thread.
* <p>
* The general contract of the method {@code run} is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
VehicleInstance vehicleIns = LocalContainer.getVehicleInstance(vin);
vehicleIns.initVehicleThread();
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
vehicleIns.startSend();
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
vehicleIns.pauseSend();
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
vehicleIns.startSend();
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
vehicleIns.stopSend();
Test.countDownLatch.countDown();
}
}

View File

@ -0,0 +1,105 @@
package com.muyu.vehicle;
import com.muyu.common.ThreadPool;
import com.muyu.domain.Vehicle;
import com.muyu.vehicle.model.VehicleData;
import com.muyu.vehicle.thread.VehicleThread;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.util.concurrent.ScheduledFuture;
import static java.lang.Thread.sleep;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/11/16
*/
@Data
@Log4j2
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VehicleInstance {
/**
*
*/
private Vehicle vehicle;
/**
*
*/
private VehicleData vehicleData;
/**
* 线
*/
private VehicleThread vehicleThread;
/**
* 线
*/
private ScheduledFuture<?> scheduledFuture;
/***
* VIN
* @return VIN
*/
public String getVin(){
return this.vehicle.getVin();
}
/**
* 线
*/
public void initVehicleThread(){
VehicleThread vehicleThread = new VehicleThread();
vehicleThread.setVehicleInstance(this);
this.setVehicleThread(vehicleThread);
this.vehicleThread.pause();
ScheduledFuture<?> scheduledFuture = ThreadPool.submit(vehicleThread);
this.setScheduledFuture(scheduledFuture);
log.info("初始化车辆上报模拟线程开始:[{}]", this.getVin());
}
/**
* 线
*/
public void startSend(){
this.vehicleThread.resume();
}
/**
* 线
*/
public void pauseSend(){
this.vehicleThread.pause();
}
/**
*
*/
public void stopSend(){
this.vehicleThread.stop();
}
/**
*
*/
public void cancelExecution(){
scheduledFuture.cancel(true);
this.vehicleThread = null;
this.scheduledFuture = null;
}
}

View File

@ -1,8 +1,6 @@
package com.muyu.vehicle.core;
import com.muyu.domain.Vehicle;
import com.muyu.vehicle.model.VehicleData;
import com.muyu.vehicle.model.VehicleInstance;
import com.muyu.vehicle.VehicleInstance;
import java.util.HashMap;
import java.util.Map;
@ -21,19 +19,22 @@ public class LocalContainer {
/**
*
* @param vehicle
* @param vehicleInstance
*/
public static void setVehicle(Vehicle vehicle){
String vin = vehicle.getVin();
VehicleInstance vehicleInstance = vehicleDataMap.get(vin);
if (vehicleInstance == null){
vehicleDataMap.put(vin,
VehicleInstance.builder()
.vehicleData(VehicleData.vehicleBuild(vehicle))
.vehicle(vehicle)
.build()
);
public static void setVehicleInstance(VehicleInstance vehicleInstance){
String vin = vehicleInstance.getVehicle().getVin();
if (!vehicleDataMap.containsKey(vin)) {
vehicleDataMap.put(vin, vehicleInstance);
}
}
/**
*
* @param vin vin
* @return
*/
public static VehicleInstance getVehicleInstance(String vin){
return vehicleDataMap.get(vin);
}
}

View File

@ -34,7 +34,7 @@ public class VehicleConfiguration implements ApplicationRunner {
while (true){
Page<Vehicle> vehiclePage = vehicleService.page(new Page<Vehicle>(page++, pageSize));
List<Vehicle> records = vehiclePage.getRecords();
records.forEach(LocalContainer::setVehicle);
// records.forEach(LocalContainer::setVehicle);
log.info("第[{}]页,[{}]条", page, records.size());
if (records.size() < pageSize){
break;

View File

@ -1,35 +0,0 @@
package com.muyu.vehicle.model;
import com.muyu.domain.Vehicle;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import static java.lang.Thread.sleep;
/**
* @author DongZeLiang
* @version 1.0
* @description
* @date 2023/11/16
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VehicleInstance {
/**
*
*/
private Vehicle vehicle;
/**
*
*/
private VehicleData vehicleData;
}

View File

@ -0,0 +1,66 @@
package com.muyu.vehicle.thread;
import com.muyu.vehicle.VehicleInstance;
import lombok.extern.log4j.Log4j2;
@Log4j2
public class VehicleThread implements Runnable {
/**
* 线
*/
private volatile boolean isStop = false;
/**
*
*/
private volatile boolean isPaused;
/**
*
*/
private VehicleInstance vehicleInstance;
@Override
public void run() {
if (!isStop){
if (!isPaused){
System.out.println(System.currentTimeMillis());
}else {
log.info("暂停模拟和上报:[{}]", this.vehicleInstance.getVin());
}
}else {
log.info("终止模拟和上报:[{}]", this.vehicleInstance.getVin());
vehicleInstance.cancelExecution();
}
}
/**
* 线
*/
public void pause() {
isPaused = true;
}
/**
* 线
*/
public void resume() {
isPaused = false;
synchronized (this) {
notify(); // 唤醒线程
}
}
/**
*
*/
public void stop(){
this.isStop = true;
}
public void setVehicleInstance(VehicleInstance vehicleInstance) {
this.vehicleInstance = vehicleInstance;
}
}