多线程模拟上报完善

new-master
dongzeliang 2023-11-18 09:15:20 +08:00
parent 2473b1c944
commit 4e33949c95
6 changed files with 153 additions and 26 deletions

View File

@ -35,6 +35,14 @@
<scope>test</scope>
</dependency>
<!-- mqtt3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

View File

@ -15,7 +15,7 @@ public class ThreadPool {
/**
* 线
*/
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(16);
public static ScheduledFuture<?> submit (Runnable thread){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位

View File

@ -2,11 +2,16 @@ package com.muyu.vehicle;
import com.muyu.common.ThreadPool;
import com.muyu.domain.Vehicle;
import com.muyu.utils.VehicleUtils;
import com.muyu.vehicle.core.LocalContainer;
import com.muyu.vehicle.model.VehicleData;
import com.muyu.vehicle.model.properties.MqttProperties;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static java.lang.Thread.sleep;
@ -18,14 +23,18 @@ import static java.lang.Thread.sleep;
*/
public class Test {
public static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
public static CountDownLatch countDownLatch;
String vin1 = "VIN123456789123456", vin2 = "VIN123456789166666";
init(vin1);
init(vin2);
new Thread(new TestThread(vin1)).start();
new Thread(new TestThread(vin2)).start();
public static void main(String[] args) throws InterruptedException {
List<String> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(VehicleUtils.genVin());
}
countDownLatch = new CountDownLatch(list.size());
list.forEach(vin -> {
init(vin);
new Thread(new TestThread(vin)).start();
});
countDownLatch.await();
ThreadPool.shutdown();
}
@ -38,7 +47,14 @@ public class Test {
.batteryLevel(new BigDecimal("50000"))
.build();
VehicleInstance vehicleInstance = new VehicleInstance();
VehicleInstance vehicleInstance = new VehicleInstance(
MqttProperties.builder()
.broker("tcp://fluxmq.muyu.icu:1883")
.topic("test")
.clientId(vin)
.build()
);
vehicleInstance.initCline();
vehicleInstance.setVehicle(vehicle);
vehicleInstance.setVehicleData(VehicleData.vehicleBuild(vehicle));
LocalContainer.setVehicleInstance(vehicleInstance);
@ -69,14 +85,14 @@ class TestThread implements Runnable{
public void run() {
VehicleInstance vehicleIns = LocalContainer.getVehicleInstance(vin);
vehicleIns.initVehicleThread();
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// try {
// sleep(3000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
vehicleIns.startSend();
try {
/*try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
@ -88,15 +104,15 @@ class TestThread implements Runnable{
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
vehicleIns.startSend();
vehicleIns.startSend();*/
try {
sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
vehicleIns.stopSend();
Test.countDownLatch.countDown();
// try {
// sleep(10000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// vehicleIns.stopSend();
//
// Test.countDownLatch.countDown();
}
}

View File

@ -3,13 +3,20 @@ package com.muyu.vehicle;
import com.muyu.common.ThreadPool;
import com.muyu.domain.Vehicle;
import com.muyu.vehicle.model.VehicleData;
import com.muyu.vehicle.model.properties.MqttProperties;
import com.muyu.vehicle.thread.VehicleThread;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import static java.lang.Thread.sleep;
@ -23,7 +30,6 @@ import static java.lang.Thread.sleep;
@Data
@Log4j2
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VehicleInstance {
@ -49,6 +55,20 @@ public class VehicleInstance {
*/
private ScheduledFuture<?> scheduledFuture;
/**
*
*/
private MqttClient client = null;
/**
* Mqtt
*/
private MqttProperties mqttProperties;
public VehicleInstance(MqttProperties mqttProperties) {
this.mqttProperties = mqttProperties;
}
/***
* VIN
@ -58,6 +78,40 @@ public class VehicleInstance {
return this.vehicle.getVin();
}
public void sendMsg(String msg){
// 创建消息并设置 QoS
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(this.mqttProperties.getQos());
// 发布消息
try {
client.publish(this.mqttProperties.getTopic(), message);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
public void initCline(){
try {
client = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId(), new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
// 设置用户名和密码
if (Objects.nonNull(mqttProperties.getUsername()) && Objects.nonNull(mqttProperties.getPassword())) {
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
}
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
// 连接
client.connect(options);
log.info("mqtt初始化成功");
} catch (MqttException e) {
log.error("mqtt链接服务器异常{}", e.getMessage(), e);
throw new RuntimeException(e);
}
}
/**
* 线

View File

@ -0,0 +1,47 @@
package com.muyu.vehicle.model.properties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author DongZeLiang
* @version 1.0
* @description Mqtt
* @date 2023/11/8
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttProperties {
/**
*
*/
private String broker;
/**
*
*/
private String topic;
/**
*
*/
private String username;
/**
*
*/
private String password;
/**
* ID
*/
private String clientId;
private int qos = 0;
}

View File

@ -25,7 +25,9 @@ public class VehicleThread implements Runnable {
public void run() {
if (!isStop){
if (!isPaused){
System.out.println(System.currentTimeMillis());
log.info("{} - 模拟数据", this.vehicleInstance.getVin());
log.info("{} - 上报数据", this.vehicleInstance.getVin());
this.vehicleInstance.sendMsg(String.format("%s - 上报数据", this.vehicleInstance.getVin()));
}else {
log.info("暂停模拟和上报:[{}]", this.vehicleInstance.getVin());
}