diff --git a/pom.xml b/pom.xml index 5dbed24..101ee19 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,14 @@ test + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + org.projectlombok lombok diff --git a/src/main/java/com/muyu/common/ThreadPool.java b/src/main/java/com/muyu/common/ThreadPool.java index 1603f43..1c6e4f9 100644 --- a/src/main/java/com/muyu/common/ThreadPool.java +++ b/src/main/java/com/muyu/common/ThreadPool.java @@ -15,7 +15,7 @@ public class ThreadPool { /** * 周期性线程池 */ - private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); + private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(16); public static ScheduledFuture submit (Runnable thread){ // 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位 diff --git a/src/main/java/com/muyu/vehicle/Test.java b/src/main/java/com/muyu/vehicle/Test.java index 2eae2da..fc3acd0 100644 --- a/src/main/java/com/muyu/vehicle/Test.java +++ b/src/main/java/com/muyu/vehicle/Test.java @@ -2,11 +2,16 @@ package com.muyu.vehicle; import com.muyu.common.ThreadPool; import com.muyu.domain.Vehicle; +import com.muyu.utils.VehicleUtils; import com.muyu.vehicle.core.LocalContainer; import com.muyu.vehicle.model.VehicleData; +import com.muyu.vehicle.model.properties.MqttProperties; +import java.lang.reflect.Array; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.CountDownLatch; import static java.lang.Thread.sleep; @@ -18,14 +23,18 @@ import static java.lang.Thread.sleep; */ public class Test { - public static CountDownLatch countDownLatch = new CountDownLatch(2); - public static void main(String[] args) throws InterruptedException { + public static CountDownLatch countDownLatch; - String vin1 = "VIN123456789123456", vin2 = "VIN123456789166666"; - init(vin1); - init(vin2); - new Thread(new TestThread(vin1)).start(); - new Thread(new TestThread(vin2)).start(); + public static void main(String[] args) throws InterruptedException { + List list = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + list.add(VehicleUtils.genVin()); + } + countDownLatch = new CountDownLatch(list.size()); + list.forEach(vin -> { + init(vin); + new Thread(new TestThread(vin)).start(); + }); countDownLatch.await(); ThreadPool.shutdown(); } @@ -38,7 +47,14 @@ public class Test { .batteryLevel(new BigDecimal("50000")) .build(); - VehicleInstance vehicleInstance = new VehicleInstance(); + VehicleInstance vehicleInstance = new VehicleInstance( + MqttProperties.builder() + .broker("tcp://fluxmq.muyu.icu:1883") + .topic("test") + .clientId(vin) + .build() + ); + vehicleInstance.initCline(); vehicleInstance.setVehicle(vehicle); vehicleInstance.setVehicleData(VehicleData.vehicleBuild(vehicle)); LocalContainer.setVehicleInstance(vehicleInstance); @@ -69,14 +85,14 @@ class TestThread implements Runnable{ public void run() { VehicleInstance vehicleIns = LocalContainer.getVehicleInstance(vin); vehicleIns.initVehicleThread(); - try { - sleep(3000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } +// try { +// sleep(3000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } vehicleIns.startSend(); - try { + /*try { sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -88,15 +104,15 @@ class TestThread implements Runnable{ } catch (InterruptedException e) { throw new RuntimeException(e); } - vehicleIns.startSend(); + vehicleIns.startSend();*/ - try { - sleep(3000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - vehicleIns.stopSend(); - - Test.countDownLatch.countDown(); +// try { +// sleep(10000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// vehicleIns.stopSend(); +// +// Test.countDownLatch.countDown(); } } diff --git a/src/main/java/com/muyu/vehicle/VehicleInstance.java b/src/main/java/com/muyu/vehicle/VehicleInstance.java index 5e4d2b4..c7a1b62 100644 --- a/src/main/java/com/muyu/vehicle/VehicleInstance.java +++ b/src/main/java/com/muyu/vehicle/VehicleInstance.java @@ -3,13 +3,20 @@ package com.muyu.vehicle; import com.muyu.common.ThreadPool; import com.muyu.domain.Vehicle; import com.muyu.vehicle.model.VehicleData; +import com.muyu.vehicle.model.properties.MqttProperties; import com.muyu.vehicle.thread.VehicleThread; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import java.util.Objects; import java.util.concurrent.ScheduledFuture; import static java.lang.Thread.sleep; @@ -23,7 +30,6 @@ import static java.lang.Thread.sleep; @Data @Log4j2 @Builder -@NoArgsConstructor @AllArgsConstructor public class VehicleInstance { @@ -49,6 +55,20 @@ public class VehicleInstance { */ private ScheduledFuture scheduledFuture; + /** + * 链接上报 + */ + private MqttClient client = null; + + + /** + * Mqtt配置 + */ + private MqttProperties mqttProperties; + + public VehicleInstance(MqttProperties mqttProperties) { + this.mqttProperties = mqttProperties; + } /*** * 获取当前车辆VIN @@ -58,6 +78,40 @@ public class VehicleInstance { return this.vehicle.getVin(); } + public void sendMsg(String msg){ + // 创建消息并设置 QoS + MqttMessage message = new MqttMessage(msg.getBytes()); + message.setQos(this.mqttProperties.getQos()); + // 发布消息 + try { + client.publish(this.mqttProperties.getTopic(), message); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + + public void initCline(){ + try { + client = new MqttClient(mqttProperties.getBroker(), mqttProperties.getClientId(), new MemoryPersistence()); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + // 设置用户名和密码 + if (Objects.nonNull(mqttProperties.getUsername()) && Objects.nonNull(mqttProperties.getPassword())) { + options.setUserName(mqttProperties.getUsername()); + options.setPassword(mqttProperties.getPassword().toCharArray()); + } + + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + // 连接 + client.connect(options); + log.info("mqtt初始化成功"); + } catch (MqttException e) { + log.error("mqtt链接服务器异常:{}", e.getMessage(), e); + throw new RuntimeException(e); + } + } + /** * 初始化线程 diff --git a/src/main/java/com/muyu/vehicle/model/properties/MqttProperties.java b/src/main/java/com/muyu/vehicle/model/properties/MqttProperties.java new file mode 100644 index 0000000..58daa7c --- /dev/null +++ b/src/main/java/com/muyu/vehicle/model/properties/MqttProperties.java @@ -0,0 +1,47 @@ +package com.muyu.vehicle.model.properties; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author DongZeLiang + * @version 1.0 + * @description Mqtt配置类 + * @date 2023/11/8 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttProperties { + + /** + * 节点 + */ + private String broker; + + /** + * 主题 + */ + private String topic; + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 节点ID + */ + private String clientId; + private int qos = 0; +} diff --git a/src/main/java/com/muyu/vehicle/thread/VehicleThread.java b/src/main/java/com/muyu/vehicle/thread/VehicleThread.java index b2ad011..db27992 100644 --- a/src/main/java/com/muyu/vehicle/thread/VehicleThread.java +++ b/src/main/java/com/muyu/vehicle/thread/VehicleThread.java @@ -25,7 +25,9 @@ public class VehicleThread implements Runnable { public void run() { if (!isStop){ if (!isPaused){ - System.out.println(System.currentTimeMillis()); + log.info("{} - 模拟数据", this.vehicleInstance.getVin()); + log.info("{} - 上报数据", this.vehicleInstance.getVin()); + this.vehicleInstance.sendMsg(String.format("%s - 上报数据", this.vehicleInstance.getVin())); }else { log.info("暂停模拟和上报:[{}]", this.vehicleInstance.getVin()); }