fix(): 修复时间处理基础架构

dev.vehicleGateway
xinzirun 2024-09-30 18:03:48 +08:00
parent 7100bf71bf
commit 3ebd1d2c39
12 changed files with 240 additions and 169 deletions

View File

@ -0,0 +1,37 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationEvent;
/**
* @Author: zi run
* @Date 2024/9/30 15:11
* @Description
*/
public class BasicEvent<T> extends ApplicationEvent {
/**
*
*/
private final T data;
/**
*
*
* @param source
* @param data
*/
public BasicEvent(Object source, T data) {
super(source);
this.data = data;
}
/**
*
*
* @return
*/
public T getData() {
return data;
}
}

View File

@ -0,0 +1,37 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/9/30 15:37
* @Description
*/
@Component
public class BasicEventHandler<T> implements ApplicationListener<BasicEvent<T>> {
/**
*
*/
private final BasicEventListener<T> listener;
/**
*
*
* @param listener
*/
public BasicEventHandler(BasicEventListener<T> listener) {
this.listener = listener;
}
/**
*
*
* @param event
*/
@Override
public void onApplicationEvent(BasicEvent<T> event) {
listener.onEvent(event);
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.event.process.basic;
/**
* @Author: zi run
* @Date 2024/9/30 15:35
* @Description
*/
public interface BasicEventListener<T> {
/**
*
*
* @param event
*/
void onEvent(BasicEvent<T> event);
}

View File

@ -1,37 +0,0 @@
package com.muyu.event.process.basic;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.context.ApplicationEvent;
/**
* @Author: zi run
* @Date 2024/9/29 21:19
* @Description
*/
public class CustomEvent extends ApplicationEvent {
/**
*
*/
private final JSONObject data;
/**
*
*
* @param source
* @param data JSON
*/
public CustomEvent(Object source, JSONObject data) {
super(source);
this.data = data;
}
/**
*
*
* @return JSONObject
*/
public JSONObject getData() {
return data;
}
}

View File

@ -1,18 +0,0 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationListener;
/**
* @Author: zi run
* @Date 2024/9/29 21:29
* @Description
*/
public interface EventListener extends ApplicationListener<CustomEvent> {
/**
*
*
* @param event
*/
void onEvent(CustomEvent event);
}

View File

@ -1,6 +1,5 @@
package com.muyu.event.process.basic; package com.muyu.event.process.basic;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -16,24 +15,24 @@ public class EventPublisher implements ApplicationEventPublisherAware {
/** /**
* *
*/ */
private ApplicationEventPublisher applicationEventPublisher; private ApplicationEventPublisher publisher;
/** /**
* *
* *
* @param applicationEventPublisher * @param publisher
*/ */
@Override @Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.applicationEventPublisher = applicationEventPublisher; this.publisher = publisher;
} }
/** /**
* *
* * @param event
* @param messages JSON * @param <T>
*/ */
public void publishEvent(JSONObject messages) { public <T> void publish(BasicEvent<T> event) {
applicationEventPublisher.publishEvent(new CustomEvent(this, messages)); publisher.publishEvent(event);
} }
} }

View File

@ -1,19 +0,0 @@
package com.muyu.event.process.config;
import com.muyu.event.process.listener.AddDatabaseListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zi run
* @Date 2024/9/29 21:29
* @Description
*/
@Configuration
public class EventListenerConfig {
@Bean
public AddDatabaseListener addDatabaseListener() {
return new AddDatabaseListener();
}
}

View File

@ -1,24 +1,24 @@
package com.muyu.event.process.consumer; package com.muyu.event.process.consumer;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.muyu.event.process.basic.EventPublisher; import com.muyu.event.process.basic.EventPublisher;
import com.muyu.event.process.event.IoTDBInsertDataEvent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Async; import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.Duration;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/** /**
* @Author: zi run * @Author: zi run
@ -28,7 +28,7 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class VehicleConsumer implements InitializingBean { public class VehicleConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
/** /**
* kafka * kafka
@ -43,40 +43,51 @@ public class VehicleConsumer implements InitializingBean {
/** /**
* () * ()
*/ */
public final static String MESSAGE_PARSING = "test-topic"; public final static String MESSAGE_PARSING = "MessageParsing";
// @Override /**
// public void run(ApplicationArguments args) throws Exception { * 线线
// log.info("开始监听kafka-topic{}", MESSAGE_PARSING); */
// List<String> topicList = Collections.singletonList(MESSAGE_PARSING); private final ExecutorService executorService =
// kafkaConsumer.subscribe(topicList); Executors.newFixedThreadPool(10);
//
// while (true) {
// ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
// consumerRecords.forEach(record -> {
// String value = record.value();
// log.info("接收到车辆报文数据,内容:{}", value);
// eventPublisher.publishEvent(JSONObject.parseObject(value));
// });
// }
// }
@Async
@Override @Override
public void afterPropertiesSet() throws Exception { public void run(ApplicationArguments args) throws Exception {
new Thread(() -> { log.info("启动线程监听Topic: {}", MESSAGE_PARSING);
log.info("启动线程监听Topic: {}", MESSAGE_PARSING); List<String> topics = Collections.singletonList(MESSAGE_PARSING);
ThreadUtil.sleep(100); kafkaConsumer.subscribe(topics);
Collection<String> topics = Lists.newArrayList(MESSAGE_PARSING); while (true) {
kafkaConsumer.subscribe(topics); ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
while (true) { consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord)));
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); }
consumerRecords.forEach(consumerRecord -> { }
String message = consumerRecord.value();
log.info("接收到车辆报文数据,内容:{}", message); private void handleRecord(ConsumerRecord<String, String> consumerRecord) {
eventPublisher.publishEvent(JSONObject.parseObject(message)); String message = consumerRecord.value();
}); log.info("接收到车辆报文数据,内容:{}", message);
log.info("------------------------------------------------");
eventPublisher.publish(new IoTDBInsertDataEvent(this, message));
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("关闭线程池和Kafka消费者");
try {
executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
} }
}).start(); } catch (InterruptedException e) {
log.error("线程池关闭被中断,强制关闭", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
try {
kafkaConsumer.close(); // 关闭Kafka消费者
} catch (Exception e) {
log.error("关闭Kafka消费者时发生错误", e);
}
} }
} }

View File

@ -50,9 +50,9 @@ public class TestEventController extends BaseController {
@GetMapping(value = "/sendKafka") @GetMapping(value = "/sendKafka")
public Result<String> senKafka() { public Result<String> senKafka() {
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("id",1); jsonObject.put("id","1");
jsonObject.put("name","张三"); jsonObject.put("name","张三");
jsonObject.put("age",18); jsonObject.put("age","18");
jsonObject.put("sex","男"); jsonObject.put("sex","男");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString()); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString());
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);

View File

@ -0,0 +1,20 @@
package com.muyu.event.process.event;
import com.muyu.event.process.basic.BasicEvent;
/**
* @Author: zi run
* @Date 2024/9/29 21:19
* @Description IoTDB
*/
public class IoTDBInsertDataEvent extends BasicEvent<String> {
/**
* IoTDB
*
* @param messsge
*/
public IoTDBInsertDataEvent(Object source, String messsge) {
super(source, messsge);
}
}

View File

@ -1,43 +0,0 @@
package com.muyu.event.process.listener;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.process.basic.CustomEvent;
import com.muyu.event.process.basic.EventListener;
import com.muyu.event.process.iotdb.basic.service.IService;
import com.muyu.event.process.iotdb.service.IoTDBService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/29 22:12
* @Description
*/
public class AddDatabaseListener implements EventListener {
/**
* IoTDB
*/
@Autowired
private IoTDBService ioTDBService;
@Override
public void onEvent(CustomEvent event) {
JSONObject data = event.getData();
List<String> keyList = new ArrayList<>();
List<String> valueList = new ArrayList<>();
data.forEach((key, value) -> {
keyList.add(key);
valueList.add((String) value);
});
ioTDBService.insertStringRecord("root.vehicle", System.currentTimeMillis(), keyList, valueList);
}
@Override
public void onApplicationEvent(CustomEvent event) {
onEvent(event);
}
}

View File

@ -0,0 +1,68 @@
package com.muyu.event.process.listener;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.process.basic.BasicEvent;
import com.muyu.event.process.basic.BasicEventListener;
import com.muyu.event.process.event.IoTDBInsertDataEvent;
import com.muyu.event.process.iotdb.service.IoTDBService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Author: zi run
* @Date 2024/9/29 22:12
* @Description IoTDB
*/
@Component
@RequiredArgsConstructor
public class IoTDBInsertDataListener implements BasicEventListener<String> {
/**
* IoTDB
*/
private final IoTDBService ioTDBService;
/**
*
*/
private static final String DEVICE_ID = "root.vehicle";
/**
* IoTDB
*
* @param event
*/
@Override
public void onEvent(BasicEvent<String> event) {
JSONObject data = JSONObject.parseObject(event.getData());
List<String> keyList = extractKeys(data);
List<String> valueList = extractValues(data);
ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList);
}
/**
* JSONObject
*
* @param data JSONObject
* @return
*/
private List<String> extractKeys(JSONObject data) {
return data.keySet().stream().collect(Collectors.toList());
}
/**
* JSONObject
*
* @param data JSONObject
* @return
*/
private List<String> extractValues(JSONObject data) {
return data.values().stream()
.map(Object::toString)
.collect(Collectors.toList());
}
}