Compare commits

...

2 Commits

Author SHA1 Message Date
xinzirun 3ebd1d2c39 fix(): 修复时间处理基础架构 2024-09-30 18:03:48 +08:00
xinzirun 7100bf71bf feat(): 添加事件处理业务 2024-09-30 00:52:20 +08:00
24 changed files with 562 additions and 27 deletions

View File

@ -52,6 +52,12 @@
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- IotDB会话 -->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
</dependency>
<!-- MuYu Common DataSource -->
<dependency>
<groupId>com.muyu</groupId>
@ -82,10 +88,10 @@
<artifactId>cloud-common-core</artifactId>
</dependency>
<!-- IotDB会话 -->
<!-- kafka模块-->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-kafka</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,37 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationEvent;
/**
* @Author: zi run
* @Date 2024/9/30 15:11
* @Description
*/
public class BasicEvent<T> extends ApplicationEvent {
/**
*
*/
private final T data;
/**
*
*
* @param source
* @param data
*/
public BasicEvent(Object source, T data) {
super(source);
this.data = data;
}
/**
*
*
* @return
*/
public T getData() {
return data;
}
}

View File

@ -0,0 +1,37 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/9/30 15:37
* @Description
*/
@Component
public class BasicEventHandler<T> implements ApplicationListener<BasicEvent<T>> {
/**
*
*/
private final BasicEventListener<T> listener;
/**
*
*
* @param listener
*/
public BasicEventHandler(BasicEventListener<T> listener) {
this.listener = listener;
}
/**
*
*
* @param event
*/
@Override
public void onApplicationEvent(BasicEvent<T> event) {
listener.onEvent(event);
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.event.process.basic;
/**
* @Author: zi run
* @Date 2024/9/30 15:35
* @Description
*/
public interface BasicEventListener<T> {
/**
*
*
* @param event
*/
void onEvent(BasicEvent<T> event);
}

View File

@ -0,0 +1,38 @@
package com.muyu.event.process.basic;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
/**
* @Author: zi run
* @Date 2024/9/29 22:01
* @Description
*/
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
/**
*
*/
private ApplicationEventPublisher publisher;
/**
*
*
* @param publisher
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
/**
*
* @param event
* @param <T>
*/
public <T> void publish(BasicEvent<T> event) {
publisher.publishEvent(event);
}
}

View File

@ -0,0 +1,52 @@
package com.muyu.event.process.consumer;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collection;
/**
* @Author: zi run
* @Date 2024/9/29 16:53
* @Description
*/
@Slf4j
//@Component
@RequiredArgsConstructor
public class TestConsumer implements InitializingBean {
/**
* kafka
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
* kafka
*/
private static final String topicName = "test-topic";
@Override
public void afterPropertiesSet() throws Exception {
new Thread(() -> {
log.info("启动线程监听Topic: {}", topicName);
ThreadUtil.sleep(1000);
Collection<String> topics = Lists.newArrayList(topicName);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> {
String value = record.value();
log.info("从Kafka中消费的原始数据: {}", value);
});
}
}).start();
}
}

View File

@ -0,0 +1,93 @@
package com.muyu.event.process.consumer;
import com.muyu.event.process.basic.EventPublisher;
import com.muyu.event.process.event.IoTDBInsertDataEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author: zi run
* @Date 2024/9/29 23:23
* @Description
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class VehicleConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
/**
* kafka
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
*
*/
private final EventPublisher eventPublisher;
/**
* ()
*/
public final static String MESSAGE_PARSING = "MessageParsing";
/**
* 线线
*/
private final ExecutorService executorService =
Executors.newFixedThreadPool(10);
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("启动线程监听Topic: {}", MESSAGE_PARSING);
List<String> topics = Collections.singletonList(MESSAGE_PARSING);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord)));
}
}
private void handleRecord(ConsumerRecord<String, String> consumerRecord) {
String message = consumerRecord.value();
log.info("接收到车辆报文数据,内容:{}", message);
log.info("------------------------------------------------");
eventPublisher.publish(new IoTDBInsertDataEvent(this, message));
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("关闭线程池和Kafka消费者");
try {
executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
log.error("线程池关闭被中断,强制关闭", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
try {
kafkaConsumer.close(); // 关闭Kafka消费者
} catch (Exception e) {
log.error("关闭Kafka消费者时发生错误", e);
}
}
}

View File

@ -0,0 +1,88 @@
package com.muyu.event.process.controller;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.common.core.constant.Constants;
import com.muyu.common.core.domain.Result;
import com.muyu.common.core.web.controller.BaseController;
import com.muyu.event.process.iotdb.service.TestIoTDBService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 16:24
* @Description
*/
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/test-event")
public class TestEventController extends BaseController {
/**
* kafka
*/
private final KafkaProducer<String, String> kafkaProducer;
/**
* kafka
*/
private static final String kafkaTopicName = "test-topic";
/**
* IoTDB
*/
private final TestIoTDBService testIoTDBService;
/**
* Kafka
*
* @return
*/
@GetMapping(value = "/sendKafka")
public Result<String> senKafka() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id","1");
jsonObject.put("name","张三");
jsonObject.put("age","18");
jsonObject.put("sex","男");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString());
kafkaProducer.send(producerRecord);
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
/**
* IoTDB
* @return
*/
@GetMapping(value = "/list")
public Result<List<Map<String, Object>>> list() {
return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE);
}
/**
* IoTDB
*
* @return
*/
@PostMapping(value = "/save")
public Result<String> save() {
String deviceId = "root.test";
ArrayList<String> keyList = new ArrayList<>();
ArrayList<String> valueList = new ArrayList<>();
keyList.add("car_vin");
keyList.add("car_name");
valueList.add("VIN123456");
valueList.add("宝马");
testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList);
return Result.success(null, Constants.SUCCESS_MESSAGE);
}
}

View File

@ -0,0 +1,20 @@
package com.muyu.event.process.event;
import com.muyu.event.process.basic.BasicEvent;
/**
* @Author: zi run
* @Date 2024/9/29 21:19
* @Description IoTDB
*/
public class IoTDBInsertDataEvent extends BasicEvent<String> {
/**
* IoTDB
*
* @param messsge
*/
public IoTDBInsertDataEvent(Object source, String messsge) {
super(source, messsge);
}
}

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.basic.config;
package com.muyu.event.process.iotdb.basic.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.session.pool.SessionPool;

View File

@ -1,7 +1,7 @@
package com.muyu.event.process.basic.service;
package com.muyu.event.process.iotdb.basic.service;
import com.muyu.event.process.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO;
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@ -14,9 +14,9 @@ import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/28 23:37
* @Description IoTDB
* @Description IoTDB
*/
public interface IoTDBService {
public interface IService {
/**
* Tablet IoTDB

View File

@ -1,11 +1,10 @@
package com.muyu.event.process.basic.service.impl;
package com.muyu.event.process.iotdb.basic.service.impl;
import com.alibaba.fastjson.JSON;
import com.muyu.event.process.basic.config.IoTDBSessionConfig;
import com.muyu.event.process.basic.service.IoTDBService;
import com.muyu.event.process.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO;
import lombok.RequiredArgsConstructor;
import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig;
import com.muyu.event.process.iotdb.basic.service.IService;
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
@ -17,6 +16,7 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
@ -26,17 +26,17 @@ import java.util.stream.Collectors;
/**
* @Author: zi run
* @Date 2024/9/28 23:38
* @Description IoTDB
* @Description IoTDB
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class IoTDBServiceImpl implements IoTDBService {
public class ServiceImpl implements IService {
/**
* IoTDB
*/
private final IoTDBSessionConfig ioTDBSessionConfig;
@Autowired
private IoTDBSessionConfig ioTDBSessionConfig;
/**
* Tablet IoTDB

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain;
package com.muyu.event.process.iotdb.domain;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;

View File

@ -1,6 +1,6 @@
package com.muyu.event.process.domain;
package com.muyu.event.process.iotdb.domain;
import com.muyu.event.process.domain.dto.IoTDbRecordAble;
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
import lombok.Data;
import lombok.EqualsAndHashCode;

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain;
package com.muyu.event.process.iotdb.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain.dto;
package com.muyu.event.process.iotdb.domain.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;

View File

@ -1,4 +1,4 @@
package com.muyu.event.process.domain.dto;
package com.muyu.event.process.iotdb.domain.dto;
import lombok.AllArgsConstructor;
import lombok.Data;

View File

@ -0,0 +1,11 @@
package com.muyu.event.process.iotdb.service;
import com.muyu.event.process.iotdb.basic.service.IService;
/**
* @Author: zi run
* @Date 2024/9/29 22:38
* @Description IoTDB
*/
public interface IoTDBService extends IService {
}

View File

@ -0,0 +1,20 @@
package com.muyu.event.process.iotdb.service;
import com.muyu.event.process.iotdb.basic.service.IService;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 17:23
* @Description IoTDB
*/
public interface TestIoTDBService extends IService {
/**
* IoTDB
* @return
*/
List<Map<String, Object>> list();
}

View File

@ -0,0 +1,14 @@
package com.muyu.event.process.iotdb.service.impl;
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
import com.muyu.event.process.iotdb.service.IoTDBService;
import org.springframework.stereotype.Service;
/**
* @Author: zi run
* @Date 2024/9/29 22:39
* @Description IoTDB
*/
@Service
public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService {
}

View File

@ -0,0 +1,33 @@
package com.muyu.event.process.iotdb.service.impl;
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
import com.muyu.event.process.iotdb.service.TestIoTDBService;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.isession.SessionDataSet;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/29 17:24
* @Description IoTDB
*/
@Slf4j
@Service
public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService {
/**
* IoTDB
* @return
*/
@Override
public List<Map<String, Object>> list() {
String sql = "select * from root.test";
SessionDataSet sessionDataSet = this.executeQueryStatement(sql);
List<Map<String, Object>> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes());
log.info("查询IoTDB数据为{}", list.toString());
return list;
}
}

View File

@ -0,0 +1,68 @@
package com.muyu.event.process.listener;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.event.process.basic.BasicEvent;
import com.muyu.event.process.basic.BasicEventListener;
import com.muyu.event.process.event.IoTDBInsertDataEvent;
import com.muyu.event.process.iotdb.service.IoTDBService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Author: zi run
* @Date 2024/9/29 22:12
* @Description IoTDB
*/
@Component
@RequiredArgsConstructor
public class IoTDBInsertDataListener implements BasicEventListener<String> {
/**
* IoTDB
*/
private final IoTDBService ioTDBService;
/**
*
*/
private static final String DEVICE_ID = "root.vehicle";
/**
* IoTDB
*
* @param event
*/
@Override
public void onEvent(BasicEvent<String> event) {
JSONObject data = JSONObject.parseObject(event.getData());
List<String> keyList = extractKeys(data);
List<String> valueList = extractValues(data);
ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList);
}
/**
* JSONObject
*
* @param data JSONObject
* @return
*/
private List<String> extractKeys(JSONObject data) {
return data.keySet().stream().collect(Collectors.toList());
}
/**
* JSONObject
*
* @param data JSONObject
* @return
*/
private List<String> extractValues(JSONObject data) {
return data.values().stream()
.map(Object::toString)
.collect(Collectors.toList());
}
}

View File

@ -42,4 +42,6 @@ spring:
# 系统共享配置
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# 系统环境Config共享配置
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# kafka共享配置
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}