From c5b7e1f817db302676676ac3ec3d05eb0c7e4e02 Mon Sep 17 00:00:00 2001 From: DongZeLiang <2746733890@qq.com> Date: Sat, 6 Apr 2024 11:14:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 28 ++++ src/main/java/com/muyu/MqttApplication.java | 16 +++ .../java/com/muyu/domain/VehicleData.java | 27 ++++ .../muyu/kafka/config/CustomPartitioner.java | 31 +++++ .../kafka/config/KafkaProducerConfig.java | 29 ++++ .../com/muyu/kafka/consumer/ConsumerTest.java | 57 ++++++++ .../com/muyu/kafka/contents/KafkaContent.java | 13 ++ .../com/muyu/kafka/producer/ProducerTest.java | 68 +++++++++ .../java/com/muyu/mqtt/PublishSample.java | 2 +- .../java/com/muyu/mqtt/SubscribeSample.java | 7 +- src/main/java/com/muyu/mqtt/Test1.java | 9 -- .../java/com/muyu/mqtt/config/MqttConfig.java | 43 ++++++ .../java/com/muyu/mqtt/config/MqttProper.java | 32 +++++ .../com/muyu/mqtt/service/MqttService.java | 84 +++++++++++ .../java/com/muyu/parsing/ParsingService.java | 49 +++++++ src/main/java/com/muyu/parsing/TestTread.java | 20 +++ .../parsing/config/KafkaConsumerConfig.java | 34 +++++ .../muyu/parsing/service/EventControl.java | 20 +++ .../muyu/parsing/service/event/EventCon.java | 58 ++++++++ .../service/event/VehicleEventService.java | 20 +++ .../parsing/service/event/impl/Guzhang.java | 46 +++++++ .../RealTimeTrajectoryEventImplService.java | 71 ++++++++++ .../event/impl/StorageEventServiceImpl.java | 71 ++++++++++ .../service/event/impl/ZhiBiaoYuJing.java | 28 ++++ .../muyu/parsing/storage/LocalStorage.java | 45 ++++++ .../java/com/muyu/utils/FixedThreadPool.java | 36 +++++ .../com/muyu/utils/ScheduledThreadPool.java | 37 +++++ src/main/java/com/muyu/utils/SpringUtils.java | 114 +++++++++++++++ src/main/resources/application.yml | 5 + src/test/java/com/test/GuZhangTest.java | 130 ++++++++++++++++++ 30 files changed, 1217 insertions(+), 13 deletions(-) create mode 100644 src/main/java/com/muyu/MqttApplication.java create mode 100644 src/main/java/com/muyu/domain/VehicleData.java create mode 100644 src/main/java/com/muyu/kafka/config/CustomPartitioner.java create mode 100644 src/main/java/com/muyu/kafka/config/KafkaProducerConfig.java create mode 100644 src/main/java/com/muyu/kafka/consumer/ConsumerTest.java create mode 100644 src/main/java/com/muyu/kafka/contents/KafkaContent.java create mode 100644 src/main/java/com/muyu/kafka/producer/ProducerTest.java delete mode 100644 src/main/java/com/muyu/mqtt/Test1.java create mode 100644 src/main/java/com/muyu/mqtt/config/MqttConfig.java create mode 100644 src/main/java/com/muyu/mqtt/config/MqttProper.java create mode 100644 src/main/java/com/muyu/mqtt/service/MqttService.java create mode 100644 src/main/java/com/muyu/parsing/ParsingService.java create mode 100644 src/main/java/com/muyu/parsing/TestTread.java create mode 100644 src/main/java/com/muyu/parsing/config/KafkaConsumerConfig.java create mode 100644 src/main/java/com/muyu/parsing/service/EventControl.java create mode 100644 src/main/java/com/muyu/parsing/service/event/EventCon.java create mode 100644 src/main/java/com/muyu/parsing/service/event/VehicleEventService.java create mode 100644 src/main/java/com/muyu/parsing/service/event/impl/Guzhang.java create mode 100644 src/main/java/com/muyu/parsing/service/event/impl/RealTimeTrajectoryEventImplService.java create mode 100644 src/main/java/com/muyu/parsing/service/event/impl/StorageEventServiceImpl.java create mode 100644 src/main/java/com/muyu/parsing/service/event/impl/ZhiBiaoYuJing.java create mode 100644 src/main/java/com/muyu/parsing/storage/LocalStorage.java create mode 100644 src/main/java/com/muyu/utils/FixedThreadPool.java create mode 100644 src/main/java/com/muyu/utils/ScheduledThreadPool.java create mode 100644 src/main/java/com/muyu/utils/SpringUtils.java create mode 100644 src/main/resources/application.yml create mode 100644 src/test/java/com/test/GuZhangTest.java diff --git a/pom.xml b/pom.xml index 1094b8f..3e6baa2 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,30 @@ UTF-8 + + org.springframework.boot + spring-boot-starter-parent + 2.7.15 + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.apache.kafka + kafka-clients + 3.3.1 + + + + org.springframework.boot + spring-boot-starter-test + + org.eclipse.paho org.eclipse.paho.client.mqttv3 @@ -26,6 +49,11 @@ lombok 1.18.28 + + com.alibaba.fastjson2 + fastjson2 + 2.0.42 + diff --git a/src/main/java/com/muyu/MqttApplication.java b/src/main/java/com/muyu/MqttApplication.java new file mode 100644 index 0000000..fdf3de6 --- /dev/null +++ b/src/main/java/com/muyu/MqttApplication.java @@ -0,0 +1,16 @@ +package com.muyu; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author DongZl + * @description: 启动类 + * @Date 2023-11-24 下午 02:01 + */ +@SpringBootApplication +public class MqttApplication { + public static void main (String[] args) { + SpringApplication.run(MqttApplication.class, args); + } +} diff --git a/src/main/java/com/muyu/domain/VehicleData.java b/src/main/java/com/muyu/domain/VehicleData.java new file mode 100644 index 0000000..bd170ce --- /dev/null +++ b/src/main/java/com/muyu/domain/VehicleData.java @@ -0,0 +1,27 @@ +package com.muyu.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author DongZl + * @description: 车辆数据对象 + * @Date 2023-11-24 下午 03:02 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class VehicleData { + + private String vin; + + + public static VehicleData msgBuild(String msg){ + return VehicleData.builder() + .vin("VIN") + .build(); + } +} diff --git a/src/main/java/com/muyu/kafka/config/CustomPartitioner.java b/src/main/java/com/muyu/kafka/config/CustomPartitioner.java new file mode 100644 index 0000000..95ffecd --- /dev/null +++ b/src/main/java/com/muyu/kafka/config/CustomPartitioner.java @@ -0,0 +1,31 @@ +package com.muyu.kafka.config; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import java.util.List; +import java.util.Map; + +public class CustomPartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + List partitions = cluster.partitionsForTopic(topic); + int numPartitions = partitions.size(); + // 自定义分区逻辑 + // 根据消息的 key 或 value 来选择分区 + // 这里以 key 的哈希值作为分区选择依据 + int partition = Math.abs(key.hashCode()) % numPartitions; + return partition; + } + + @Override + public void close() { + // 可选:清理资源 + } + + @Override + public void configure(Map configs) { + // 可选:配置分区器 + } +} diff --git a/src/main/java/com/muyu/kafka/config/KafkaProducerConfig.java b/src/main/java/com/muyu/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..d342051 --- /dev/null +++ b/src/main/java/com/muyu/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,29 @@ +package com.muyu.kafka.config; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; + +import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON; + +/** + * @author DongZl + * @description: kakfa生产者 + * @Date 2023-11-24 下午 02:40 + */ +//@Configuration +public class KafkaProducerConfig { + + @Bean + public Producer producerInit(){ + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_CON); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("partitioner.class", "com.muyu.kafka.config.CustomPartitioner"); + return new KafkaProducer<>(props); + } +} diff --git a/src/main/java/com/muyu/kafka/consumer/ConsumerTest.java b/src/main/java/com/muyu/kafka/consumer/ConsumerTest.java new file mode 100644 index 0000000..1e0d271 --- /dev/null +++ b/src/main/java/com/muyu/kafka/consumer/ConsumerTest.java @@ -0,0 +1,57 @@ +package com.muyu.kafka.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON; +import static com.muyu.kafka.contents.KafkaContent.TOPIC; + +/** + * @author DongZl + * @description: 消费者测试类 + * @Date 2023/8/25 18:52 + */ +public class ConsumerTest { + private static KafkaConsumer consumer; + + private static void KfkConsumer() { + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_CON); + props.put("group.id", "group01"); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(TOPIC)); + } + + private static void close() { + consumer.close(); + } + + private static List> poll(int num) { + List> result = new ArrayList<>(); + while (result.size() < num) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + result.add(Arrays.asList(record.offset(), record.key(), record.value())); + System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value()); + } + } + return result; + } + + public static void main (String[] args) { + KfkConsumer(); + poll(1000); + close(); + } +} diff --git a/src/main/java/com/muyu/kafka/contents/KafkaContent.java b/src/main/java/com/muyu/kafka/contents/KafkaContent.java new file mode 100644 index 0000000..114c2ef --- /dev/null +++ b/src/main/java/com/muyu/kafka/contents/KafkaContent.java @@ -0,0 +1,13 @@ +package com.muyu.kafka.contents; + +/** + * @author DongZl + * @description: kafka常量类 + * @Date 2023/8/25 18:47 + */ +public class KafkaContent { + + public static final String TOPIC = "top"; + + public static final String KAFKA_CON = "47.113.220.52:39092,47.113.220.52:29092,47.113.220.52:19092"; +} diff --git a/src/main/java/com/muyu/kafka/producer/ProducerTest.java b/src/main/java/com/muyu/kafka/producer/ProducerTest.java new file mode 100644 index 0000000..f9c0303 --- /dev/null +++ b/src/main/java/com/muyu/kafka/producer/ProducerTest.java @@ -0,0 +1,68 @@ +package com.muyu.kafka.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON; +import static com.muyu.kafka.contents.KafkaContent.TOPIC; + +/** + * @author DongZl + * @description: 生产者测试 + * @Date 2023/8/25 18:50 + */ +public class ProducerTest { + private static Producer producer; + + public static void KfkProducer() { + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_CON); +// props.put("linger.ms", 1); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(props); + } + + private static void close() { + producer.close(); + } + + private static RecordMetadata send(String topic, String key, String value) { + Future result = producer.send(new ProducerRecord<>(topic, key, value)); + RecordMetadata meta = null; + try { + meta = result.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return meta; + } + + public static void main (String[] args) { + KfkProducer(); + + new Thread(() -> { + int i = 0; + do { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + send(TOPIC, UUID.randomUUID().toString(), format); + }while (i++ < 1000); + close(); + }).start(); + + } +} diff --git a/src/main/java/com/muyu/mqtt/PublishSample.java b/src/main/java/com/muyu/mqtt/PublishSample.java index 99bdce6..6d0496a 100644 --- a/src/main/java/com/muyu/mqtt/PublishSample.java +++ b/src/main/java/com/muyu/mqtt/PublishSample.java @@ -16,7 +16,7 @@ public class PublishSample { public static void main (String[] args) { - String broker = "tcp://192.168.40.128:1883"; + String broker = "tcp://fluxmq.muyu.icu:1883"; String topic = "test"; String username = "emqx"; String password = "public"; diff --git a/src/main/java/com/muyu/mqtt/SubscribeSample.java b/src/main/java/com/muyu/mqtt/SubscribeSample.java index 9a31e2c..9a68402 100644 --- a/src/main/java/com/muyu/mqtt/SubscribeSample.java +++ b/src/main/java/com/muyu/mqtt/SubscribeSample.java @@ -3,17 +3,18 @@ package com.muyu.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import java.util.UUID; + public class SubscribeSample { public static void main (String[] args) { - String broker = "tcp://192.168.40.128:1883"; + String broker = "tcp://fluxmq.muyu.icu:1883"; String topic = "test"; String username = "emqx"; String password = "public"; - String clientid = "subscribe_client"; int qos = 0; try { - MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); + MqttClient client = new MqttClient(broker, UUID.randomUUID().toString(), new MemoryPersistence()); // 连接参数 MqttConnectOptions options = new MqttConnectOptions(); // options.setUserName(username); diff --git a/src/main/java/com/muyu/mqtt/Test1.java b/src/main/java/com/muyu/mqtt/Test1.java deleted file mode 100644 index 0e0907a..0000000 --- a/src/main/java/com/muyu/mqtt/Test1.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.muyu.mqtt; - -/** - * @author DongZl - * @description: 测试1 - * @Date 2023-11-6 下午 04:15 - */ -public class Test1 { -} diff --git a/src/main/java/com/muyu/mqtt/config/MqttConfig.java b/src/main/java/com/muyu/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..88477b7 --- /dev/null +++ b/src/main/java/com/muyu/mqtt/config/MqttConfig.java @@ -0,0 +1,43 @@ +package com.muyu.mqtt.config; + +import com.muyu.mqtt.service.MqttService; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.UUID; + +/** + * @author DongZl + * @description: Mqtt配置 + * @Date 2023-11-24 下午 02:06 + */ +@Log4j2 +//@Configuration +public class MqttConfig { + + @Bean + public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){ + try { + log.info("mqtt服务器初始化开始"); + long startTime = System.currentTimeMillis(); + MqttClient client = new MqttClient(mqttProper.getBroker(), + UUID.randomUUID().toString(), + new MemoryPersistence()); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + options.setConnectionTimeout(60); + options.setKeepAliveInterval(60); + log.info("mqtt服务器初始化结束, 耗时:[{}MS]", System.currentTimeMillis() - startTime); + client.connect(options); + client.setCallback(mqttService); + client.subscribe(mqttProper.getTopic(), 0); + return client; + }catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/muyu/mqtt/config/MqttProper.java b/src/main/java/com/muyu/mqtt/config/MqttProper.java new file mode 100644 index 0000000..d8cd559 --- /dev/null +++ b/src/main/java/com/muyu/mqtt/config/MqttProper.java @@ -0,0 +1,32 @@ +package com.muyu.mqtt.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author DongZl + * @description: mqtt配置 + * @Date 2023-11-24 下午 02:04 + */ +@Data +@Builder +@Configuration +@NoArgsConstructor +@AllArgsConstructor +@ConfigurationProperties(prefix = "mqtt.config") +public class MqttProper { + + /** + * 节点 + */ + private String broker; + + /** + * 主题 + */ + private String topic; +} diff --git a/src/main/java/com/muyu/mqtt/service/MqttService.java b/src/main/java/com/muyu/mqtt/service/MqttService.java new file mode 100644 index 0000000..26b6bfd --- /dev/null +++ b/src/main/java/com/muyu/mqtt/service/MqttService.java @@ -0,0 +1,84 @@ +package com.muyu.mqtt.service; + +import com.muyu.kafka.contents.KafkaContent; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * @author DongZl + * @description: Mqtt消费者 + * @Date 2023-11-24 下午 02:10 + */ +@Log4j2 +@Service +public class MqttService implements MqttCallback { + +// @Autowired + private Producer producer; + + private RecordMetadata send(String topic, String key, String value) { + Future result = producer.send(new ProducerRecord<>(topic, key, value)); + RecordMetadata meta = null; + try { + meta = result.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return meta; + } + + /** + * This method is called when the connection to the server is lost. + * + * @param cause the reason behind the loss of connection. + */ + @Override + public void connectionLost (Throwable cause) { + + } + + /** + *

当消息从服务器到达时,将调用此方法。

+ *

该方法由MQTT客户端同步调用。确认不会发送回服务器,直到此方法干净地返回。

+ *

如果此方法的实现抛出异常,则客户端将被关闭。当客户端下一次重新连接时,任何QoS 1或2消息将由服务器重新传递。

+ *

在运行此方法的实现时到达的任何其他消息将在内存中建立,然后将在网络上备份。

+ *

如果应用程序需要持久化数据,那么它应该确保在从该方法返回之前持久化数据,因为在从该方法返回之后,消息被认为已被传递,并且将不可再现。

+ *

可以在此回调的实现中发送新消息 (例如,对此消息的响应),但实现不能断开客户端的连接,因为不可能发送正在处理的消息的确认,并且会发生死锁。

+ * + * @param topic 消息上的主题名称已发布到 + * @param message 真正的信息。 + * + * @throws Exception 如果发生了终端错误,客户端应该关闭。 + */ + @Override + public void messageArrived (String topic, MqttMessage message) throws Exception { + String msg = new String(message.getPayload()); + log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg); + send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), msg); + } + + /** + * Called when delivery for a message has been completed, and all + * acknowledgments have been received. For QoS 0 messages it is + * called once the message has been handed to the network for + * delivery. For QoS 1 it is called when PUBACK is received and + * for QoS 2 when PUBCOMP is received. The token will be the same + * token as that returned when the message was published. + * + * @param token the delivery token associated with the message. + */ + @Override + public void deliveryComplete (IMqttDeliveryToken token) { + + } +} diff --git a/src/main/java/com/muyu/parsing/ParsingService.java b/src/main/java/com/muyu/parsing/ParsingService.java new file mode 100644 index 0000000..a90e13d --- /dev/null +++ b/src/main/java/com/muyu/parsing/ParsingService.java @@ -0,0 +1,49 @@ +package com.muyu.parsing; + +import com.muyu.domain.VehicleData; +import com.muyu.parsing.service.event.EventCon; +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.Arrays; + +/** + * @author DongZl + * @description: 解析业务 + * @Date 2023-11-24 下午 02:58 + */ +@Log4j2 +@Component +@AllArgsConstructor +public class ParsingService { + + private final KafkaConsumer consumer; + + @PostConstruct + public void init(){ + new Thread(() -> { + while (true){ + ConsumerRecords records = null; + try { + records = consumer.poll(Duration.ofMillis(5)); + for (ConsumerRecord record : records) { + VehicleData vehicleData = VehicleData.msgBuild(record.value()); + EventCon.execute(vehicleData); + log.info("offset: [{}], key:[{}], value:[{}]", record.offset(), record.key(), record.value()); + } + }catch (Exception e){ + log.info("records: {}", records); + log.error(e); + } + } + }).start(); + } + +} diff --git a/src/main/java/com/muyu/parsing/TestTread.java b/src/main/java/com/muyu/parsing/TestTread.java new file mode 100644 index 0000000..317c690 --- /dev/null +++ b/src/main/java/com/muyu/parsing/TestTread.java @@ -0,0 +1,20 @@ +package com.muyu.parsing; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; +import java.util.UUID; + +/** + * @author DongZl + * @description: 测试线程 + * @Date 2023-12-11 下午 02:37 + */ +public class TestTread { +} diff --git a/src/main/java/com/muyu/parsing/config/KafkaConsumerConfig.java b/src/main/java/com/muyu/parsing/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..26d549d --- /dev/null +++ b/src/main/java/com/muyu/parsing/config/KafkaConsumerConfig.java @@ -0,0 +1,34 @@ +package com.muyu.parsing.config; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; +import java.util.Properties; + +import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON; +import static com.muyu.kafka.contents.KafkaContent.TOPIC; + +/** + * @author DongZl + * @description: 解析卡夫卡 + * @Date 2023-11-24 下午 02:55 + */ +@Configuration +public class KafkaConsumerConfig { + + @Bean + public KafkaConsumer consumerInit(){ + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_CON); + props.put("group.id", "group01"); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(TOPIC)); + return consumer; + } +} diff --git a/src/main/java/com/muyu/parsing/service/EventControl.java b/src/main/java/com/muyu/parsing/service/EventControl.java new file mode 100644 index 0000000..367037b --- /dev/null +++ b/src/main/java/com/muyu/parsing/service/EventControl.java @@ -0,0 +1,20 @@ +package com.muyu.parsing.service; + +import com.muyu.domain.VehicleData; +import org.springframework.stereotype.Service; + +/** + * @author DongZl + * @description: 事件控制 + * @Date 2023-11-24 下午 03:05 + */ +@Service +public interface EventControl { + + // 注册事件 + public void registerEvent(String vin, String eventId); + // 注销事件 + public void logoffEvent(String vin, String eventId); + // 执行事件 + public void execute(VehicleData vehicleData); +} diff --git a/src/main/java/com/muyu/parsing/service/event/EventCon.java b/src/main/java/com/muyu/parsing/service/event/EventCon.java new file mode 100644 index 0000000..99ff35c --- /dev/null +++ b/src/main/java/com/muyu/parsing/service/event/EventCon.java @@ -0,0 +1,58 @@ +package com.muyu.parsing.service.event; + +import com.muyu.domain.VehicleData; +import com.muyu.utils.SpringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author DongZl + * @description: 事件控制器 + * @Date 2023-11-26 下午 08:58 + */ +//@Component +public class EventCon { + + public static final Map> eventMap = new ConcurrentHashMap<>(); + // ["event-storage", "event-realTimeTrajectory"] + + /** + * 注册事件 + */ + public static void registerEvent (String vin, String eventId) { + getEvent(vin).add(eventId); + } + + + /** + * 注销事件 + */ + public static void logoffEvent (String vin, String eventId) { + getEvent(vin).remove(eventId); + } + + /** + * 执行事件 + * @param vehicleData + */ + public static void execute (VehicleData vehicleData) { + List eventList = getEvent(vehicleData.getVin()); + eventList.forEach(eventId -> { + // ["event-storage", "event-realTimeTrajectory"] + VehicleEventService vehicleEventService = SpringUtils.getBean(eventId); + vehicleEventService.execute(vehicleData); + }); + } + + + /** + * 根据VIN获取事件 + */ + public static List getEvent(String vin){ + return eventMap.computeIfAbsent(vin, k -> new ArrayList<>()); + } +} + diff --git a/src/main/java/com/muyu/parsing/service/event/VehicleEventService.java b/src/main/java/com/muyu/parsing/service/event/VehicleEventService.java new file mode 100644 index 0000000..e938390 --- /dev/null +++ b/src/main/java/com/muyu/parsing/service/event/VehicleEventService.java @@ -0,0 +1,20 @@ +package com.muyu.parsing.service.event; + +import com.muyu.domain.VehicleData; + +/** + * @author DongZl + * @description: 车辆事件 + * @Date 2023-11-24 下午 03:07 + */ +public interface VehicleEventService { + + public void execute(VehicleData vehicleData); + + /** + * 事件名称 + * @return + */ + String getEventName (); + +} diff --git a/src/main/java/com/muyu/parsing/service/event/impl/Guzhang.java b/src/main/java/com/muyu/parsing/service/event/impl/Guzhang.java new file mode 100644 index 0000000..a272ec9 --- /dev/null +++ b/src/main/java/com/muyu/parsing/service/event/impl/Guzhang.java @@ -0,0 +1,46 @@ +package com.muyu.parsing.service.event.impl; + +import com.muyu.domain.VehicleData; +import com.muyu.parsing.service.event.VehicleEventService; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; +import java.util.UUID; + +/** + * @author DongZl + * @description: + * @Date 2023-12-1 下午 03:04 + */ +@Log4j2 +@Service("guzhang") +public class Guzhang implements VehicleEventService { + @Override + public void execute (VehicleData vehicleData) { + log.info("开始故障"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + log.info("结束故障"); + } + + /** + * 事件名称 + * + * @return + */ + @Override + public String getEventName () { + return "guzhang"; + } +} diff --git a/src/main/java/com/muyu/parsing/service/event/impl/RealTimeTrajectoryEventImplService.java b/src/main/java/com/muyu/parsing/service/event/impl/RealTimeTrajectoryEventImplService.java new file mode 100644 index 0000000..635d27f --- /dev/null +++ b/src/main/java/com/muyu/parsing/service/event/impl/RealTimeTrajectoryEventImplService.java @@ -0,0 +1,71 @@ +package com.muyu.parsing.service.event.impl; + +import com.muyu.domain.VehicleData; +import com.muyu.parsing.service.event.VehicleEventService; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; +import java.util.UUID; + +/** + * @author DongZl + * @description: 实时轨迹 + * @Date 2023-11-24 下午 03:08 + */ +@Log4j2 +@Service("event-realTimeTrajectory") +public class RealTimeTrajectoryEventImplService implements VehicleEventService { + @Override + public void execute (VehicleData vehicleData) { + log.info("开始实时轨迹"); + String broker = "tcp://fluxmq.muyu.icu:1883", topic = "test",clientId = UUID.randomUUID().toString(); + int qos = 0; + try { + MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + // 连接 + client.connect(options); + int nextInt = new Random().nextInt(1, 10); + log.info("实时轨迹:[{}]次", nextInt); + if (nextInt >= 8){ + Thread.sleep(8 * 10000); + } + for (int i = 0 ; i < nextInt ; i++) { + String content = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒").format(new Date()) + " - 实时轨迹 MQTT" + clientId; + // 创建消息并设置 QoS + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + // 发布消息 + client.publish(topic, message); + log.info("[{}] - [{}]", Thread.currentThread().getName(), content); + Thread.sleep(100); + } + // 关闭连接 + client.disconnect(); + // 关闭客户端 + client.close(); + log.info("结束实时轨迹"); + } catch (MqttException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * 事件名称 + * + * @return + */ + @Override + public String getEventName () { + return "event-realTimeTrajectory"; + } +} diff --git a/src/main/java/com/muyu/parsing/service/event/impl/StorageEventServiceImpl.java b/src/main/java/com/muyu/parsing/service/event/impl/StorageEventServiceImpl.java new file mode 100644 index 0000000..4369982 --- /dev/null +++ b/src/main/java/com/muyu/parsing/service/event/impl/StorageEventServiceImpl.java @@ -0,0 +1,71 @@ +package com.muyu.parsing.service.event.impl; + +import com.muyu.domain.VehicleData; +import com.muyu.parsing.service.event.VehicleEventService; +import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Service; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; +import java.util.UUID; + +/** + * @author DongZl + * @description: 存储事件 + * @Date 2023-11-24 下午 03:08 + */ +@Log4j2 +@Service("event-storage") +public class StorageEventServiceImpl implements VehicleEventService { + @Override + public void execute (VehicleData vehicleData) { + log.info("开始存储"); + String broker = "tcp://fluxmq.muyu.icu:1883", topic = "test",clientId = UUID.randomUUID().toString(); + int qos = 0; + try { + MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); + // 连接参数 + MqttConnectOptions options = new MqttConnectOptions(); + // 连接 + client.connect(options); + int nextInt = new Random().nextInt(1, 10); + log.info("本地存储:[{}]次", nextInt); + if (nextInt >= 8){ + Thread.sleep(8 * 10000); + } + for (int i = 0 ; i < nextInt ; i++) { + String content = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒").format(new Date()) + " - 存储 MQTT" + clientId; + // 创建消息并设置 QoS + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + // 发布消息 + client.publish(topic, message); + log.info("[{}] - [{}]", Thread.currentThread().getName(), content); + Thread.sleep(100); + } + // 关闭连接 + client.disconnect(); + // 关闭客户端 + client.close(); + log.info("结束存储"); + } catch (MqttException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * 事件名称 + * + * @return + */ + @Override + public String getEventName () { + return "event-storage"; + } +} diff --git a/src/main/java/com/muyu/parsing/service/event/impl/ZhiBiaoYuJing.java b/src/main/java/com/muyu/parsing/service/event/impl/ZhiBiaoYuJing.java new file mode 100644 index 0000000..471bd2d --- /dev/null +++ b/src/main/java/com/muyu/parsing/service/event/impl/ZhiBiaoYuJing.java @@ -0,0 +1,28 @@ +package com.muyu.parsing.service.event.impl; + +import com.muyu.domain.VehicleData; +import com.muyu.parsing.service.event.VehicleEventService; +import org.springframework.stereotype.Service; + +/** + * @author DongZl + * @description: 指标预警 + * @Date 2024-3-31 上午 11:14 + */ +@Service("event-zhibiaoyujing") +public class ZhiBiaoYuJing implements VehicleEventService { + @Override + public void execute (VehicleData vehicleData) { + + } + + /** + * 事件名称 + * + * @return + */ + @Override + public String getEventName () { + return "指标预警"; + } +} diff --git a/src/main/java/com/muyu/parsing/storage/LocalStorage.java b/src/main/java/com/muyu/parsing/storage/LocalStorage.java new file mode 100644 index 0000000..826acc0 --- /dev/null +++ b/src/main/java/com/muyu/parsing/storage/LocalStorage.java @@ -0,0 +1,45 @@ +package com.muyu.parsing.storage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author DongZl + * @description: 本地通道 + * @Date 2023-12-12 上午 09:57 + */ +public class LocalStorage { + + public volatile AtomicBoolean checkQueue = new AtomicBoolean(); + + public LinkedBlockingQueue trueQueue = new LinkedBlockingQueue<>(); + + public LinkedBlockingQueue falseQueue = new LinkedBlockingQueue<>(); + + private LinkedBlockingQueue getQueue(){ + return checkQueue.get() ? trueQueue : falseQueue; + } + + private void checkQueue(){ + checkQueue.set(!checkQueue.get()); + } + + /** + * 存放 + */ + public void push(String data){ + getQueue().offer(data); + } + /** + * 获取 + */ + public String[] getDataList(){ + LinkedBlockingQueue dataQueue = getQueue(); + this.checkQueue(); + return dataQueue.toArray(new String[]{}); + } + +} diff --git a/src/main/java/com/muyu/utils/FixedThreadPool.java b/src/main/java/com/muyu/utils/FixedThreadPool.java new file mode 100644 index 0000000..ed9ea1a --- /dev/null +++ b/src/main/java/com/muyu/utils/FixedThreadPool.java @@ -0,0 +1,36 @@ +package com.muyu.utils; + +import java.util.concurrent.*; + +/** + * @author DongZl + * @description: 可控最大并发数线程池 + * @Date 2023-12-5 下午 01:51 + */ +public class FixedThreadPool { + + /** + * 可重用固定个数的线程池 + */ + private final static ExecutorService fixedThreadPool = + new ThreadPoolExecutor(2, + 2, + 0L,TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(45)); + + + /** + * 线程池提交任务 + * @param thread 线程 + */ + public static Future submit(Thread thread){ + return fixedThreadPool.submit(thread); + } + + /** + * 关闭线程池 + */ + public static void shutDown(){ + fixedThreadPool.shutdown(); + } +} diff --git a/src/main/java/com/muyu/utils/ScheduledThreadPool.java b/src/main/java/com/muyu/utils/ScheduledThreadPool.java new file mode 100644 index 0000000..3d0d282 --- /dev/null +++ b/src/main/java/com/muyu/utils/ScheduledThreadPool.java @@ -0,0 +1,37 @@ +package com.muyu.utils; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * @author DongZl + * @description: 线程池 + * @Date 2023-11-17 上午 09:16 + */ +public class ScheduledThreadPool { + + /** + * 周期性线程池 CPU 数量 * 2 + 1 + */ + private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors() * 2 + 1); + + public static ScheduledFuture submit (Runnable thread){ + // 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位 + return submit(thread, 1); + } + + public static ScheduledFuture submit (Runnable thread, long period){ + // 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位 + return scheduledThreadPool.scheduleAtFixedRate(thread, 0, period, TimeUnit.SECONDS); + } + + /** + * 关闭线程池 + */ + public static void shutdown() { + scheduledThreadPool.shutdown(); + } +} diff --git a/src/main/java/com/muyu/utils/SpringUtils.java b/src/main/java/com/muyu/utils/SpringUtils.java new file mode 100644 index 0000000..2f7ba24 --- /dev/null +++ b/src/main/java/com/muyu/utils/SpringUtils.java @@ -0,0 +1,114 @@ +package com.muyu.utils; + +import org.springframework.aop.framework.AopContext; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.stereotype.Component; + +/** + * spring工具类 方便在非spring管理环境中获取bean + * + * @author muyu + */ +@Component +public final class SpringUtils implements BeanFactoryPostProcessor { + /** + * Spring应用上下文环境 + */ + private static ConfigurableListableBeanFactory beanFactory; + + /** + * 获取对象 + * + * @param name + * + * @return Object 一个以所给名字注册的bean的实例 + * + * @throws org.springframework.beans.BeansException + */ + @SuppressWarnings("unchecked") + public static T getBean (String name) throws BeansException { + return (T) beanFactory.getBean(name); + } + + /** + * 获取类型为requiredType的对象 + * + * @param clz + * + * @return + * + * @throws org.springframework.beans.BeansException + */ + public static T getBean (Class clz) throws BeansException { + T result = (T) beanFactory.getBean(clz); + return result; + } + + /** + * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true + * + * @param name + * + * @return boolean + */ + public static boolean containsBean (String name) { + return beanFactory.containsBean(name); + } + + /** + * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException) + * + * @param name + * + * @return boolean + * + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static boolean isSingleton (String name) throws NoSuchBeanDefinitionException { + return beanFactory.isSingleton(name); + } + + /** + * @param name + * + * @return Class 注册对象的类型 + * + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static Class getType (String name) throws NoSuchBeanDefinitionException { + return beanFactory.getType(name); + } + + /** + * 如果给定的bean名字在bean定义中有别名,则返回这些别名 + * + * @param name + * + * @return + * + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static String[] getAliases (String name) throws NoSuchBeanDefinitionException { + return beanFactory.getAliases(name); + } + + /** + * 获取aop代理对象 + * + * @param invoker + * + * @return + */ + @SuppressWarnings("unchecked") + public static T getAopProxy (T invoker) { + return (T) AopContext.currentProxy(); + } + + @Override + public void postProcessBeanFactory (ConfigurableListableBeanFactory beanFactory) throws BeansException { + SpringUtils.beanFactory = beanFactory; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..235c984 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,5 @@ +mqtt: + config: + broker: tcp://fluxmq.muyu.icu:1883 + topic: test + diff --git a/src/test/java/com/test/GuZhangTest.java b/src/test/java/com/test/GuZhangTest.java new file mode 100644 index 0000000..3317f3d --- /dev/null +++ b/src/test/java/com/test/GuZhangTest.java @@ -0,0 +1,130 @@ +package com.test; + +import com.muyu.MqttApplication; +import com.muyu.domain.VehicleData; +import com.muyu.parsing.service.event.VehicleEventService; +import com.muyu.parsing.service.event.impl.Guzhang; +import com.muyu.utils.FixedThreadPool; +import com.muyu.utils.SpringUtils; +import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author DongZl + * @description: 测试 + * @Date 2023-12-1 下午 03:04 + */ +@Log4j2 +@SpringBootTest(classes = MqttApplication.class) +public class GuZhangTest { + + @Autowired + private Guzhang guzhang; + + private Map eventServiceMap = new HashMap<>(); + + public void initEvent () { + eventServiceMap.put("guzhang", SpringUtils.getBean("guzhang")); + eventServiceMap.put("event-realTimeTrajectory", SpringUtils.getBean("event-realTimeTrajectory")); + eventServiceMap.put("event-storage", SpringUtils.getBean("event-storage")); + } + + private Map> vinEventMap = new HashMap<>(); + + public void initVinEvent () { + List eventList = new ArrayList<>() {{ + add("guzhang"); + add("event-realTimeTrajectory"); + add("event-storage"); + }}; + vinEventMap.put("VIN123456798123100", eventList); + vinEventMap.put("VIN123456798123101", eventList); + vinEventMap.put("VIN123456798123102", eventList); + vinEventMap.put("VIN123456798123103", eventList); + vinEventMap.put("VIN123456798123104", eventList); + vinEventMap.put("VIN123456798123105", eventList); + vinEventMap.put("VIN123456798123106", eventList); + vinEventMap.put("VIN123456798123107", eventList); + vinEventMap.put("VIN123456798123108", eventList); + vinEventMap.put("VIN123456798123109", eventList); + vinEventMap.put("VIN123456798123110", eventList); + vinEventMap.put("VIN123456798123111", eventList); + vinEventMap.put("VIN123456798123112", eventList); + vinEventMap.put("VIN123456798123113", eventList); + vinEventMap.put("VIN123456798123114", eventList); + vinEventMap.put("VIN123456798123115", eventList); + vinEventMap.put("VIN123456798123116", eventList); + vinEventMap.put("VIN123456798123117", eventList); + vinEventMap.put("VIN123456798123118", eventList); + vinEventMap.put("VIN123456798123119", eventList); + vinEventMap.put("VIN123456798123120", eventList); + vinEventMap.put("VIN123456798123121", eventList); + vinEventMap.put("VIN123456798123122", eventList); + vinEventMap.put("VIN123456798123123", eventList); + vinEventMap.put("VIN123456798123124", eventList); + vinEventMap.put("VIN123456798123125", eventList); + vinEventMap.put("VIN123456798123126", eventList); + vinEventMap.put("VIN123456798123127", eventList); + vinEventMap.put("VIN123456798123128", eventList); + vinEventMap.put("VIN123456798123129", eventList); + } + + @Test + public void ceshi () { + this.initEvent(); + this.initVinEvent(); + Set vinList = this.vinEventMap.keySet(); + CountDownLatch vinCountDownLatch = new CountDownLatch(vinList.size()); + for (String vin : vinList) { + // vin - 主线程 + new Thread(() -> { + try { + Map eventMap = new HashMap<>(); + List eventList = this.vinEventMap.get(vin); + CountDownLatch vehicleEventCountDownLatch = new CountDownLatch(eventList.size()); + // event - 子线程 + eventList.stream().map(eventId -> this.eventServiceMap.get(eventId)) + .forEach(vehicleEventService -> { + String threadName = vin + vehicleEventService.getEventName(); + Thread thread = new Thread(() -> { + vehicleEventService.execute( + VehicleData.builder() + .vin(vin) + .build() + ); + vehicleEventCountDownLatch.countDown(); + eventMap.remove(Thread.currentThread().getName()); + }, threadName); + FixedThreadPool.submit(thread); + eventMap.put(threadName, thread); + }); + boolean isAwait; + int awaitSize = 1, maxAwaitSize = 3; + do { + // true / false -》 latch == 0 返回 true | false latch != 0, 但是等待时间到了 + isAwait = vehicleEventCountDownLatch.await(1000, TimeUnit.MILLISECONDS); + log.info("[{}]等待[{}]次, 线程数量:[{}]", vin, awaitSize, eventMap.size()); + }while (isAwait || awaitSize++ < maxAwaitSize); + eventMap.values().forEach(Thread::interrupt); + vinCountDownLatch.countDown(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("单个VIN执行结束"); + }).start(); + } + + try { + vinCountDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("整体执行结束"); + } +}