初始化

dev-1106
DongZeLiang 2024-04-06 11:14:45 +08:00
parent b4afdf762a
commit c5b7e1f817
30 changed files with 1217 additions and 13 deletions

28
pom.xml
View File

@ -14,7 +14,30 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
</parent>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.eclipse.paho</groupId> <groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
@ -26,6 +49,11 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.18.28</version> <version>1.18.28</version>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.42</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,16 @@
package com.muyu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 02:01
*/
@SpringBootApplication
public class MqttApplication {
public static void main (String[] args) {
SpringApplication.run(MqttApplication.class, args);
}
}

View File

@ -0,0 +1,27 @@
package com.muyu.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 03:02
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VehicleData {
private String vin;
public static VehicleData msgBuild(String msg){
return VehicleData.builder()
.vin("VIN")
.build();
}
}

View File

@ -0,0 +1,31 @@
package com.muyu.kafka.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 自定义分区逻辑
// 根据消息的 key 或 value 来选择分区
// 这里以 key 的哈希值作为分区选择依据
int partition = Math.abs(key.hashCode()) % numPartitions;
return partition;
}
@Override
public void close() {
// 可选:清理资源
}
@Override
public void configure(Map<String, ?> configs) {
// 可选:配置分区器
}
}

View File

@ -0,0 +1,29 @@
package com.muyu.kafka.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON;
/**
* @author DongZl
* @description: kakfa
* @Date 2023-11-24 02:40
*/
//@Configuration
public class KafkaProducerConfig {
@Bean
public Producer<String, String> producerInit(){
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.muyu.kafka.config.CustomPartitioner");
return new KafkaProducer<>(props);
}
}

View File

@ -0,0 +1,57 @@
package com.muyu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON;
import static com.muyu.kafka.contents.KafkaContent.TOPIC;
/**
* @author DongZl
* @description:
* @Date 2023/8/25 18:52
*/
public class ConsumerTest {
private static KafkaConsumer<String, String> consumer;
private static void KfkConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
props.put("group.id", "group01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(TOPIC));
}
private static void close() {
consumer.close();
}
private static List<List<Object>> poll(int num) {
List<List<Object>> result = new ArrayList<>();
while (result.size() < num) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
result.add(Arrays.asList(record.offset(), record.key(), record.value()));
System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value());
}
}
return result;
}
public static void main (String[] args) {
KfkConsumer();
poll(1000);
close();
}
}

View File

@ -0,0 +1,13 @@
package com.muyu.kafka.contents;
/**
* @author DongZl
* @description: kafka
* @Date 2023/8/25 18:47
*/
public class KafkaContent {
public static final String TOPIC = "top";
public static final String KAFKA_CON = "47.113.220.52:39092,47.113.220.52:29092,47.113.220.52:19092";
}

View File

@ -0,0 +1,68 @@
package com.muyu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON;
import static com.muyu.kafka.contents.KafkaContent.TOPIC;
/**
* @author DongZl
* @description:
* @Date 2023/8/25 18:50
*/
public class ProducerTest {
private static Producer<String, String> producer;
public static void KfkProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
// props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
private static void close() {
producer.close();
}
private static RecordMetadata send(String topic, String key, String value) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
RecordMetadata meta = null;
try {
meta = result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return meta;
}
public static void main (String[] args) {
KfkProducer();
new Thread(() -> {
int i = 0;
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
send(TOPIC, UUID.randomUUID().toString(), format);
}while (i++ < 1000);
close();
}).start();
}
}

View File

@ -16,7 +16,7 @@ public class PublishSample {
public static void main (String[] args) { public static void main (String[] args) {
String broker = "tcp://192.168.40.128:1883"; String broker = "tcp://fluxmq.muyu.icu:1883";
String topic = "test"; String topic = "test";
String username = "emqx"; String username = "emqx";
String password = "public"; String password = "public";

View File

@ -3,17 +3,18 @@ package com.muyu.mqtt;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.UUID;
public class SubscribeSample { public class SubscribeSample {
public static void main (String[] args) { public static void main (String[] args) {
String broker = "tcp://192.168.40.128:1883"; String broker = "tcp://fluxmq.muyu.icu:1883";
String topic = "test"; String topic = "test";
String username = "emqx"; String username = "emqx";
String password = "public"; String password = "public";
String clientid = "subscribe_client";
int qos = 0; int qos = 0;
try { try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); MqttClient client = new MqttClient(broker, UUID.randomUUID().toString(), new MemoryPersistence());
// 连接参数 // 连接参数
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
// options.setUserName(username); // options.setUserName(username);

View File

@ -1,9 +0,0 @@
package com.muyu.mqtt;
/**
* @author DongZl
* @description: 1
* @Date 2023-11-6 04:15
*/
public class Test1 {
}

View File

@ -0,0 +1,43 @@
package com.muyu.mqtt.config;
import com.muyu.mqtt.service.MqttService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
/**
* @author DongZl
* @description: Mqtt
* @Date 2023-11-24 02:06
*/
@Log4j2
//@Configuration
public class MqttConfig {
@Bean
public MqttClient initClient(MqttProper mqttProper, MqttService mqttService){
try {
log.info("mqtt服务器初始化开始");
long startTime = System.currentTimeMillis();
MqttClient client = new MqttClient(mqttProper.getBroker(),
UUID.randomUUID().toString(),
new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
log.info("mqtt服务器初始化结束 耗时:[{}MS]", System.currentTimeMillis() - startTime);
client.connect(options);
client.setCallback(mqttService);
client.subscribe(mqttProper.getTopic(), 0);
return client;
}catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,32 @@
package com.muyu.mqtt.config;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author DongZl
* @description: mqtt
* @Date 2023-11-24 02:04
*/
@Data
@Builder
@Configuration
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "mqtt.config")
public class MqttProper {
/**
*
*/
private String broker;
/**
*
*/
private String topic;
}

View File

@ -0,0 +1,84 @@
package com.muyu.mqtt.service;
import com.muyu.kafka.contents.KafkaContent;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author DongZl
* @description: Mqtt
* @Date 2023-11-24 02:10
*/
@Log4j2
@Service
public class MqttService implements MqttCallback {
// @Autowired
private Producer<String, String> producer;
private RecordMetadata send(String topic, String key, String value) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
RecordMetadata meta = null;
try {
meta = result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return meta;
}
/**
* This method is called when the connection to the server is lost.
*
* @param cause the reason behind the loss of connection.
*/
@Override
public void connectionLost (Throwable cause) {
}
/**
* <p></p>
* <p>MQTT</p>
* <p>QoS 12</p>
* <p></p>
* <p></p>
* <p> ()</p>
*
* @param topic
* @param message
*
* @throws Exception
*/
@Override
public void messageArrived (String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
log.info("topic: [{}], Qos: [{}], message content: [{}]", topic, message.getQos(), msg);
send(KafkaContent.TOPIC, String.valueOf(msg.hashCode()), msg);
}
/**
* Called when delivery for a message has been completed, and all
* acknowledgments have been received. For QoS 0 messages it is
* called once the message has been handed to the network for
* delivery. For QoS 1 it is called when PUBACK is received and
* for QoS 2 when PUBCOMP is received. The token will be the same
* token as that returned when the message was published.
*
* @param token the delivery token associated with the message.
*/
@Override
public void deliveryComplete (IMqttDeliveryToken token) {
}
}

View File

@ -0,0 +1,49 @@
package com.muyu.parsing;
import com.muyu.domain.VehicleData;
import com.muyu.parsing.service.event.EventCon;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Arrays;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 02:58
*/
@Log4j2
@Component
@AllArgsConstructor
public class ParsingService {
private final KafkaConsumer<String, String> consumer;
@PostConstruct
public void init(){
new Thread(() -> {
while (true){
ConsumerRecords<String, String> records = null;
try {
records = consumer.poll(Duration.ofMillis(5));
for (ConsumerRecord<String, String> record : records) {
VehicleData vehicleData = VehicleData.msgBuild(record.value());
EventCon.execute(vehicleData);
log.info("offset: [{}] key:[{}], value:[{}]", record.offset(), record.key(), record.value());
}
}catch (Exception e){
log.info("records {}", records);
log.error(e);
}
}
}).start();
}
}

View File

@ -0,0 +1,20 @@
package com.muyu.parsing;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* @author DongZl
* @description: 线
* @Date 2023-12-11 02:37
*/
public class TestTread {
}

View File

@ -0,0 +1,34 @@
package com.muyu.parsing.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;
import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON;
import static com.muyu.kafka.contents.KafkaContent.TOPIC;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 02:55
*/
@Configuration
public class KafkaConsumerConfig {
@Bean
public KafkaConsumer<String, String> consumerInit(){
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
props.put("group.id", "group01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(TOPIC));
return consumer;
}
}

View File

@ -0,0 +1,20 @@
package com.muyu.parsing.service;
import com.muyu.domain.VehicleData;
import org.springframework.stereotype.Service;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 03:05
*/
@Service
public interface EventControl {
// 注册事件
public void registerEvent(String vin, String eventId);
// 注销事件
public void logoffEvent(String vin, String eventId);
// 执行事件
public void execute(VehicleData vehicleData);
}

View File

@ -0,0 +1,58 @@
package com.muyu.parsing.service.event;
import com.muyu.domain.VehicleData;
import com.muyu.utils.SpringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author DongZl
* @description:
* @Date 2023-11-26 08:58
*/
//@Component
public class EventCon {
public static final Map<String, List<String>> eventMap = new ConcurrentHashMap<>();
// ["event-storage", "event-realTimeTrajectory"]
/**
*
*/
public static void registerEvent (String vin, String eventId) {
getEvent(vin).add(eventId);
}
/**
*
*/
public static void logoffEvent (String vin, String eventId) {
getEvent(vin).remove(eventId);
}
/**
*
* @param vehicleData
*/
public static void execute (VehicleData vehicleData) {
List<String> eventList = getEvent(vehicleData.getVin());
eventList.forEach(eventId -> {
// ["event-storage", "event-realTimeTrajectory"]
VehicleEventService vehicleEventService = SpringUtils.getBean(eventId);
vehicleEventService.execute(vehicleData);
});
}
/**
* VIN
*/
public static List<String> getEvent(String vin){
return eventMap.computeIfAbsent(vin, k -> new ArrayList<>());
}
}

View File

@ -0,0 +1,20 @@
package com.muyu.parsing.service.event;
import com.muyu.domain.VehicleData;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 03:07
*/
public interface VehicleEventService {
public void execute(VehicleData vehicleData);
/**
*
* @return
*/
String getEventName ();
}

View File

@ -0,0 +1,46 @@
package com.muyu.parsing.service.event.impl;
import com.muyu.domain.VehicleData;
import com.muyu.parsing.service.event.VehicleEventService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* @author DongZl
* @description:
* @Date 2023-12-1 03:04
*/
@Log4j2
@Service("guzhang")
public class Guzhang implements VehicleEventService {
@Override
public void execute (VehicleData vehicleData) {
log.info("开始故障");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("结束故障");
}
/**
*
*
* @return
*/
@Override
public String getEventName () {
return "guzhang";
}
}

View File

@ -0,0 +1,71 @@
package com.muyu.parsing.service.event.impl;
import com.muyu.domain.VehicleData;
import com.muyu.parsing.service.event.VehicleEventService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 03:08
*/
@Log4j2
@Service("event-realTimeTrajectory")
public class RealTimeTrajectoryEventImplService implements VehicleEventService {
@Override
public void execute (VehicleData vehicleData) {
log.info("开始实时轨迹");
String broker = "tcp://fluxmq.muyu.icu:1883", topic = "test",clientId = UUID.randomUUID().toString();
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
// 连接
client.connect(options);
int nextInt = new Random().nextInt(1, 10);
log.info("实时轨迹:[{}]次", nextInt);
if (nextInt >= 8){
Thread.sleep(8 * 10000);
}
for (int i = 0 ; i < nextInt ; i++) {
String content = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒").format(new Date()) + " - 实时轨迹 MQTT" + clientId;
// 创建消息并设置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 发布消息
client.publish(topic, message);
log.info("[{}] - [{}]", Thread.currentThread().getName(), content);
Thread.sleep(100);
}
// 关闭连接
client.disconnect();
// 关闭客户端
client.close();
log.info("结束实时轨迹");
} catch (MqttException | InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
*
*
* @return
*/
@Override
public String getEventName () {
return "event-realTimeTrajectory";
}
}

View File

@ -0,0 +1,71 @@
package com.muyu.parsing.service.event.impl;
import com.muyu.domain.VehicleData;
import com.muyu.parsing.service.event.VehicleEventService;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* @author DongZl
* @description:
* @Date 2023-11-24 03:08
*/
@Log4j2
@Service("event-storage")
public class StorageEventServiceImpl implements VehicleEventService {
@Override
public void execute (VehicleData vehicleData) {
log.info("开始存储");
String broker = "tcp://fluxmq.muyu.icu:1883", topic = "test",clientId = UUID.randomUUID().toString();
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
// 连接
client.connect(options);
int nextInt = new Random().nextInt(1, 10);
log.info("本地存储:[{}]次", nextInt);
if (nextInt >= 8){
Thread.sleep(8 * 10000);
}
for (int i = 0 ; i < nextInt ; i++) {
String content = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒").format(new Date()) + " - 存储 MQTT" + clientId;
// 创建消息并设置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 发布消息
client.publish(topic, message);
log.info("[{}] - [{}]", Thread.currentThread().getName(), content);
Thread.sleep(100);
}
// 关闭连接
client.disconnect();
// 关闭客户端
client.close();
log.info("结束存储");
} catch (MqttException | InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
*
*
* @return
*/
@Override
public String getEventName () {
return "event-storage";
}
}

View File

@ -0,0 +1,28 @@
package com.muyu.parsing.service.event.impl;
import com.muyu.domain.VehicleData;
import com.muyu.parsing.service.event.VehicleEventService;
import org.springframework.stereotype.Service;
/**
* @author DongZl
* @description:
* @Date 2024-3-31 11:14
*/
@Service("event-zhibiaoyujing")
public class ZhiBiaoYuJing implements VehicleEventService {
@Override
public void execute (VehicleData vehicleData) {
}
/**
*
*
* @return
*/
@Override
public String getEventName () {
return "指标预警";
}
}

View File

@ -0,0 +1,45 @@
package com.muyu.parsing.storage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author DongZl
* @description:
* @Date 2023-12-12 09:57
*/
public class LocalStorage {
public volatile AtomicBoolean checkQueue = new AtomicBoolean();
public LinkedBlockingQueue<String> trueQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<String> falseQueue = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<String> getQueue(){
return checkQueue.get() ? trueQueue : falseQueue;
}
private void checkQueue(){
checkQueue.set(!checkQueue.get());
}
/**
*
*/
public void push(String data){
getQueue().offer(data);
}
/**
*
*/
public String[] getDataList(){
LinkedBlockingQueue<String> dataQueue = getQueue();
this.checkQueue();
return dataQueue.toArray(new String[]{});
}
}

View File

@ -0,0 +1,36 @@
package com.muyu.utils;
import java.util.concurrent.*;
/**
* @author DongZl
* @description: 线
* @Date 2023-12-5 01:51
*/
public class FixedThreadPool {
/**
* 线
*/
private final static ExecutorService fixedThreadPool =
new ThreadPoolExecutor(2,
2,
0L,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(45));
/**
* 线
* @param thread 线
*/
public static Future<?> submit(Thread thread){
return fixedThreadPool.submit(thread);
}
/**
* 线
*/
public static void shutDown(){
fixedThreadPool.shutdown();
}
}

View File

@ -0,0 +1,37 @@
package com.muyu.utils;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @author DongZl
* @description: 线
* @Date 2023-11-17 09:16
*/
public class ScheduledThreadPool {
/**
* 线 CPU * 2 + 1
*/
private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors() * 2 + 1);
public static ScheduledFuture<?> submit (Runnable thread){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
return submit(thread, 1);
}
public static ScheduledFuture<?> submit (Runnable thread, long period){
// 参数分别是: 任务, 多久后开始执行, 每隔多久执行一次(周期),时间单位
return scheduledThreadPool.scheduleAtFixedRate(thread, 0, period, TimeUnit.SECONDS);
}
/**
* 线
*/
public static void shutdown() {
scheduledThreadPool.shutdown();
}
}

View File

@ -0,0 +1,114 @@
package com.muyu.utils;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;
/**
* spring 便springbean
*
* @author muyu
*/
@Component
public final class SpringUtils implements BeanFactoryPostProcessor {
/**
* Spring
*/
private static ConfigurableListableBeanFactory beanFactory;
/**
*
*
* @param name
*
* @return Object bean
*
* @throws org.springframework.beans.BeansException
*/
@SuppressWarnings("unchecked")
public static <T> T getBean (String name) throws BeansException {
return (T) beanFactory.getBean(name);
}
/**
* requiredType
*
* @param clz
*
* @return
*
* @throws org.springframework.beans.BeansException
*/
public static <T> T getBean (Class<T> clz) throws BeansException {
T result = (T) beanFactory.getBean(clz);
return result;
}
/**
* BeanFactorybeantrue
*
* @param name
*
* @return boolean
*/
public static boolean containsBean (String name) {
return beanFactory.containsBean(name);
}
/**
* beansingletonprototype beanNoSuchBeanDefinitionException
*
* @param name
*
* @return boolean
*
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static boolean isSingleton (String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
}
/**
* @param name
*
* @return Class
*
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static Class<?> getType (String name) throws NoSuchBeanDefinitionException {
return beanFactory.getType(name);
}
/**
* beanbean
*
* @param name
*
* @return
*
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static String[] getAliases (String name) throws NoSuchBeanDefinitionException {
return beanFactory.getAliases(name);
}
/**
* aop
*
* @param invoker
*
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T getAopProxy (T invoker) {
return (T) AopContext.currentProxy();
}
@Override
public void postProcessBeanFactory (ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
}

View File

@ -0,0 +1,5 @@
mqtt:
config:
broker: tcp://fluxmq.muyu.icu:1883
topic: test

View File

@ -0,0 +1,130 @@
package com.test;
import com.muyu.MqttApplication;
import com.muyu.domain.VehicleData;
import com.muyu.parsing.service.event.VehicleEventService;
import com.muyu.parsing.service.event.impl.Guzhang;
import com.muyu.utils.FixedThreadPool;
import com.muyu.utils.SpringUtils;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author DongZl
* @description:
* @Date 2023-12-1 03:04
*/
@Log4j2
@SpringBootTest(classes = MqttApplication.class)
public class GuZhangTest {
@Autowired
private Guzhang guzhang;
private Map<String, VehicleEventService> eventServiceMap = new HashMap<>();
public void initEvent () {
eventServiceMap.put("guzhang", SpringUtils.getBean("guzhang"));
eventServiceMap.put("event-realTimeTrajectory", SpringUtils.getBean("event-realTimeTrajectory"));
eventServiceMap.put("event-storage", SpringUtils.getBean("event-storage"));
}
private Map<String, List<String>> vinEventMap = new HashMap<>();
public void initVinEvent () {
List<String> eventList = new ArrayList<>() {{
add("guzhang");
add("event-realTimeTrajectory");
add("event-storage");
}};
vinEventMap.put("VIN123456798123100", eventList);
vinEventMap.put("VIN123456798123101", eventList);
vinEventMap.put("VIN123456798123102", eventList);
vinEventMap.put("VIN123456798123103", eventList);
vinEventMap.put("VIN123456798123104", eventList);
vinEventMap.put("VIN123456798123105", eventList);
vinEventMap.put("VIN123456798123106", eventList);
vinEventMap.put("VIN123456798123107", eventList);
vinEventMap.put("VIN123456798123108", eventList);
vinEventMap.put("VIN123456798123109", eventList);
vinEventMap.put("VIN123456798123110", eventList);
vinEventMap.put("VIN123456798123111", eventList);
vinEventMap.put("VIN123456798123112", eventList);
vinEventMap.put("VIN123456798123113", eventList);
vinEventMap.put("VIN123456798123114", eventList);
vinEventMap.put("VIN123456798123115", eventList);
vinEventMap.put("VIN123456798123116", eventList);
vinEventMap.put("VIN123456798123117", eventList);
vinEventMap.put("VIN123456798123118", eventList);
vinEventMap.put("VIN123456798123119", eventList);
vinEventMap.put("VIN123456798123120", eventList);
vinEventMap.put("VIN123456798123121", eventList);
vinEventMap.put("VIN123456798123122", eventList);
vinEventMap.put("VIN123456798123123", eventList);
vinEventMap.put("VIN123456798123124", eventList);
vinEventMap.put("VIN123456798123125", eventList);
vinEventMap.put("VIN123456798123126", eventList);
vinEventMap.put("VIN123456798123127", eventList);
vinEventMap.put("VIN123456798123128", eventList);
vinEventMap.put("VIN123456798123129", eventList);
}
@Test
public void ceshi () {
this.initEvent();
this.initVinEvent();
Set<String> vinList = this.vinEventMap.keySet();
CountDownLatch vinCountDownLatch = new CountDownLatch(vinList.size());
for (String vin : vinList) {
// vin - 主线程
new Thread(() -> {
try {
Map<String, Thread> eventMap = new HashMap<>();
List<String> eventList = this.vinEventMap.get(vin);
CountDownLatch vehicleEventCountDownLatch = new CountDownLatch(eventList.size());
// event - 子线程
eventList.stream().map(eventId -> this.eventServiceMap.get(eventId))
.forEach(vehicleEventService -> {
String threadName = vin + vehicleEventService.getEventName();
Thread thread = new Thread(() -> {
vehicleEventService.execute(
VehicleData.builder()
.vin(vin)
.build()
);
vehicleEventCountDownLatch.countDown();
eventMap.remove(Thread.currentThread().getName());
}, threadName);
FixedThreadPool.submit(thread);
eventMap.put(threadName, thread);
});
boolean isAwait;
int awaitSize = 1, maxAwaitSize = 3;
do {
// true / false -》 latch == 0 返回 true | false latch != 0, 但是等待时间到了
isAwait = vehicleEventCountDownLatch.await(1000, TimeUnit.MILLISECONDS);
log.info("[{}]等待[{}]次, 线程数量:[{}]", vin, awaitSize, eventMap.size());
}while (isAwait || awaitSize++ < maxAwaitSize);
eventMap.values().forEach(Thread::interrupt);
vinCountDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("单个VIN执行结束");
}).start();
}
try {
vinCountDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("整体执行结束");
}
}